[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2018-09-19 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19810
  
@maropu , this looks rather cold :sunglasses: , but extremely interesting 
and relevant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218743585
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
+val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
+val superGetOrCompute: (Partition, TaskContext) => 
Iterator[CachedBatch] = super.getOrCompute
+
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
+  superGetOrCompute(split, context)
+).getOrElse {
+  val batchIter = superGetOrCompute(split, context)
+  if (containsPartitionMetadata && getStorageLevel != 
StorageLevel.NONE && batchIter.hasNext) {
+val cachedBatch = batchIter.next()
--- End diff --

assert `!batchIter.hasNext`, you expect this partition to contain a single 
batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218716111
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
--- End diff --

can this be avoided by maintaining two (zipped) RDDs? one of `CachedBatch`s 
and the other holding only the stats?
can this approach avoid the need for a specialized block type for managing 
metadata? 
correct me if i'm wrong, but first time the stats are accessed, your 
approach performs a full scan to extract the stats (happens in the sql code), 
so having a second RDD which is something like : `batches.map(_.stats).persist` 
should give the same behavior, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218714493
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
+val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
+val superGetOrCompute: (Partition, TaskContext) => 
Iterator[CachedBatch] = super.getOrCompute
+
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
+  superGetOrCompute(split, context)
+).getOrElse {
+  val batchIter = superGetOrCompute(split, context)
+  if (containsPartitionMetadata && getStorageLevel != 
StorageLevel.NONE && batchIter.hasNext) {
+val cachedBatch = batchIter.next()
+SparkEnv.get.blockManager.putSingle(metadataBlockId, 
cachedBatch.stats,
+  expectedStorageLevel)
+new InterruptibleIterator[CachedBatch](context, 
Iterator(cachedBatch))
+  } else {
+batchIter
+  }
+}
+  }
+}
+
+private[columnar] object CachedColumnarRDD {
+
+  private val rddIdToMetadata = new ConcurrentHashMap[Int, 
mutable.ArraySeq[Option[InternalRow]]]()
--- End diff --

could these be moved to become a member of the RDD class? seems like a map 
of this->some.property, in this case can be made an instance member.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218705219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
--- End diff --

can this be pulled up out of `usePartitionLevelMetadata` ? seems like 
you're constructing the predicate per record


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218704390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -52,6 +52,68 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
+private[columnar] class CachedBatchIterator(
+rowIterator: Iterator[InternalRow],
+output: Seq[Attribute],
+batchSize: Int,
+useCompression: Boolean,
+batchStats: LongAccumulator,
+singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {
+
+  def next(): CachedBatch = {
+val columnBuilders = output.map { attribute =>
+  ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
+}.toArray
+
+var rowCount = 0
+var totalSize = 0L
+
+val terminateLoop = (singleBatch: Boolean, rowIter: 
Iterator[InternalRow],
+  rowCount: Int, size: Long) => {
+  if (!singleBatch) {
+rowIter.hasNext && rowCount < batchSize && totalSize < 
ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
+  } else {
+rowIter.hasNext
--- End diff --

doesn't this run the risk of OOM for large partitions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22432: [SPARK-22713][CORE][TEST][FOLLOWUP] Fix flaky Ext...

2018-09-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/22432#discussion_r217902092
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -457,7 +458,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
 // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
 // (lines 69-89)
 // assert(map.currentMap == null)
-eventually {
+eventually(timeout(5 seconds), interval(200 milliseconds)) {
--- End diff --

@dongjoon-hyun, thanks for cleaning up my mess! :sunglasses: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22432: [SPARK-22713][CORE][TEST][FOLLOWUP] Fix flaky Ext...

2018-09-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/22432#discussion_r217901988
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -457,7 +458,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
 // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
 // (lines 69-89)
 // assert(map.currentMap == null)
-eventually {
+eventually(timeout(5 seconds), interval(200 milliseconds)) {
--- End diff --

I think best practice to encounter this is specifying config patience in 
sbt's test options.
having that said, I'v once had to 'grant' a longer timeout to a specific 
class so I've achieved this by overriding the `spanScalaeFactor` method

`override  def spanScaleFactor: Double = super.spanScaleFactor * 3`

please notice that there's another usage of `eventually` in line 519, this 
one 'manually' waits 500 millis before testing which might explain why you 
didn't see it failing in CI, looking at it now it seems like a bad practice 
since `eventually` is designed to control both the timeout and the intervals 
between trying. 

to summarize: best practice is to control this in sbt's test settings, if 
needed you can further control it in a specific class, in any case you have to 
make sure you handle all invocations of `eventually` (which is easier and less 
error prone by leveraging scalaTest's mechanisms).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17400: [SPARK-19981][SQL] Respect aliases in output part...

2018-08-21 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/17400#discussion_r211565064
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 }
   }
 
+  private def updatePartitioningByAliases(exprs: Seq[NamedExpression], 
partioning: Partitioning)
+: Partitioning = {
+val aliasSeq = exprs.flatMap(_.collectFirst {
--- End diff --

@maropu , I didn't aim for supporting complex partitioning expressions 
(which deserves its own separate PR), I meant that this code could introduce 
regressions by 'over-capturing' nested aliases.

* my specific example is wrong since struct is transformed into a named 
struct (alias is replaced by an explicit name).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...

2018-08-15 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/17400
  
thanks @maropu I appreciate this.
must say I'm pretty surprised a bug like that lives so long...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...

2018-08-14 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/17400
  
in my use case, I aggregate a dataset, the use select to align columns with 
a case-class. I later try to join the resulting dataset based on the same 
columns used for aggregattion.
the join introduces shuffles (exchange nodes).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...

2018-08-14 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/17400
  
@maropu , yes it does :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17400: [SPARK-19981][SQL] Update output partitioning inf...

2018-08-14 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/17400#discussion_r209909106
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 }
   }
 
+  private def updatePartitioningByAliases(exprs: Seq[NamedExpression], 
partioning: Partitioning)
+: Partitioning = {
+val aliasSeq = exprs.flatMap(_.collectFirst {
--- End diff --

this might do more than you'd like it to (at least if it behaves the way I 
understand collect first), i.e.
`df.select($"x" as "x1, struct($"a" as "a1", $"b" as "b1") as "s1")`

_x1_ and _s1_ are aliases, _a1_ and _b1_ are not. it could even get more 
complicated if there was an _a1_ alias in the top level projections list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...

2018-08-14 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/17400
  
@maropu , any reason why this is on hold for so long?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-08-13 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-08-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r209499005
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@cloud-fan , do you think this is worth doing, I'm referring to the 
CompletionIterator delaying GC of the sub iterator and cleanup function 
(usually a closure referring to a larger collection).
if so, I'd open a separate JIRA+PR for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-08-11 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
@hvanhovell ,thanks for picking this up :sunglasses: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-08-11 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-06-28 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r198816624
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-06-03 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r192631230
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan , can we move on with this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-30 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191655937
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan, how do you suggest to progress with this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r191077231
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

@cloud-fan , I've tried the `System.gc` approach with and without 
`eventually`, and for no avail.
seems like scalatest's asserts use some black magic macro based voodoo to 
generate a local method/val (when I got about 10 classes deep into my quest, I 
gave up) 
basically what I'm seeing in visualvm is the expected ref via the 
`WeakReference` and another surprising one via `UnaryMacroBool.left`.

it seems this sneaky ref was generated by the following assertion: 
`assert(map.currentMap == null)`, but it could have easily been generated 
elsewhere.

@cloud-fan , @gatorsmile , can you please confirm if and how can we import 
the scala code? otherwise, can you think of an alternative approach for testing 
this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-25 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190829341
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
 sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
--- End diff --

this requires using something like [scalatest's 
eventually](http://doc.scalatest.org/1.8/org/scalatest/concurrent/Eventually.html),
 don't you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-24 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
@cloud-fan , i guess we can copy the test util from scala as long as it 
doesnt violet licensing or credits, please advice on the proper way of 
doingbthat and where do u want to place the code? Does spark have a testing lib 
that can be reused?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-24 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190542635
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

hmm, we can in line 508 but not in this test.
in this test we look at the iterator immediately after a spill, at this 
point upstream is supposed to be replaced by a `DiskMapIterator`, I guess we 
can check for this directly (after relaxing its visibility to package private).

in line 508, we can simply compare with Iterator.empty


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190375595
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(currentMap I think), so unless we've missed another ref we should be fine.

as I wrote above, I think there's a potentially more fundamental issue with 
`CompletionIterator` which keeps holding references via it's `sub` and 
`completionFunction` members , these might stall some objects from being 
collected and can be eliminated upon exhaustion of the iterator. there might be 
some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think 
this desrves some more investigation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190372506
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(`currentMap` I think), so unless we've missed another ref we should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190371425
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+private[ExternalAppendOnlyMap]
--- End diff --

hmm... the class itself is private (slightly relaxed to package private to 
ease testing) so I'm not sure what's the benefit in making the method public,
in any case I think that once we see the use case for making this method 
public we'd probably has to further change the iterator/external map classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190370842
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
+
+val next50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(!it.hasNext)
+val keys = (first50Keys ++ next50Keys).sorted
+assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
--- End diff --

:+1: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190370765
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
--- End diff --

the underlying map's iterator is an anonymous class, this is the best I 
could come up with to check if the upstream iterator holds a ref to the 
underlying map.
@cloud-fan , do you have a better idea (I'm not 100% happy with this one)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189921783
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
--- End diff --

This test was written BEFORE the actual fix and it did fail up untill the 
fix was in place. I do agree it's a bit clumsy and potential future changes may 
break the original intention of the test. I've referred a potential testing 
approach (currently limited to scala's source code) which couldn't be (easily) 
applied to this code base so I made a best effort to test this.
I agree this needs better documentation, I'll start be referring the issue 
in the test's name and will also add comments to the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189919617
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
--- End diff --

Safer, class remains usable if for some reason hasNext is called again, and 
this costs absolutely nothing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189919031
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C](
*/
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): 
Iterator[(K, C)] = {
 readingIterator = new SpillableIterator(inMemoryIterator)
-readingIterator
+readingIterator.toCompletionIterator
--- End diff --

What behavior does it change? Your suggested codes does exactly the same 
but is less streamlined and relies on an intermediate value (fortunately it's 
already a member variable)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-22 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
@advancedxy , using jvisualvm+heap dump I could see that the second 
introduced test case ("drop all references to the underlying map once the 
iterator is exhausted") eliminated all references to the underlying map:
heap contained one instance of `SizeTrackingAppendOnlyMap`, but all 
references to it where unreachable, hence it was due to be evicted.
found one instance of CompletionIterator (actually anonymous class deriving 
it) which had references to the `SpillableIterator`, direct ref as member 'sub' 
and another one via member `completionFunction$1`.
the `SpillableIterator` had a single ref to the `ExternalAppendOnlyMap` 
which already had its `currentMap` field already nullified. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794281
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+def toCompletionIterator: CompletionIterator[(K, C), 
SpillableIterator] = {
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794046
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
 // Input streams are derived both from the in-memory map and spilled 
maps on disk
 // The in-memory map is sorted in place, while the spilled maps are 
already in sorted order
-private val sortedMap = CompletionIterator[(K, C), Iterator[(K, 
C)]](destructiveIterator(
-  currentMap.destructiveSortedIterator(keyComparator)), 
freeCurrentMap())
+private val sortedMap = destructiveIterator(
+  currentMap.destructiveSortedIterator(keyComparator))
--- End diff --

unfortunately no, scala-style enforces a max of 100 chars per line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-22 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189794097
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+def destroy() : Unit = {
--- End diff --

yes, fixing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-20 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-20 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
well, I took the time trying to figure out how's the iterator is eventually 
being used,
(most of) it boils down to 
`org.apache.spark.scheduler.ShuffleMapTask#runTask` which does:
`writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: 
Product2[Any, Any]]])`
looking at `org.apache.spark.shuffle.ShuffleWriter#write` implementations, 
it seems all of them first exhaust the iterator and then perform some kind of 
postprocessing: i.e. merging spills, sorting, writing partitions files and then 
concatanating them into a single file... bottom line the Iterator may actually 
be 'sitting' for some time after reaching EOF.
I'll implement the 'simple approach' for this PR, but I think this deserves 
a separate JIRA issue + PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-20 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189452094
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@jerrylead, I'd appreciate if you could test this.
One more thing that bugs me is that there's another case when the iterator 
no longer needs the upstream iterator/underlying map but still holds a 
reference to it:
When the iterator reach EOF its hasNext method returns false which causes 
the wrapping CompletionIterator to call the cleanup function which simply nulls 
the underlying map member variable in ExternalAppendOnlyMap, the iterator 
member is not nulled out so we end up with the CompletionIterator holding two 
paths to the upstrean iterator which leads to the underlying map: first it 
still holds a reference to the iterator itself, however it still holds a 
reference to the cleanup closure which refers the ExternalAppendOnlyMap which 
still refers to the current iterator which refers upstream...
This can be solven in one of two ways:
Simple way, when creating the completion iterator, provide a closure 
referring the iterator, not the ExternalAppendOnlyMap.
Thorough way, modify completion iterator to null out references after 
cleaning up.

Having that said, I'm not sure how long a completed iterator may be 
'sitting' before being discarded so I'm not sure if this is worth fixing, 
especially using the thorough approach.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r189438351
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

Basically yes, according to my understanding of the code this should have 
happened on the subsequent hasNext/next call. However according to the analysis 
in the jira the iterator kept holding this reference, my guess: at this point 
the entire program started suffering lengthy GC pauses that got it into 
behaving as if under a deadlock,effectively leaving the ref in place (just a 
guess)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-19 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-18 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/21369
  
@lianhuiwang, @davies, @hvanhovell  can you please have a look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-18 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/21369

[SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during 
iteration

## What changes were proposed in this pull request?
This PR solves 
[SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) which 
describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled 
during iteration (opposed to  insertion).

(Please fill in changes proposed in this fix)
ExternalAppendOnlyMap's iterator supports spilling but it kept a reference 
to the internal map (via an internal iterator) after spilling, it seems that 
the original code was actually supposed to 'get rid' of this reference on the 
next iteration but according to the elaborate investigation described in the 
JIRA this didn't happen.
the fix was simply replacing the internal iterator immediately after 
spilling.

## How was this patch tested?
I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this 
test asserts that neither the external map itself nor its iterator hold any 
reference to the internal map after a spill.
These approach required some access relaxation of some members variables 
and nested classes of ExternalAppendOnlyMap, this members are now package 
provate and annotated with @VisibleForTesting.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark 
SPARK-22713__ExternalAppendOnlyMap_effective_spill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21369.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21369


commit 1c4a6af2077e7ac50476101031aa1af99ff4f7b7
Author: Eyal Farago <eyal@...>
Date:   2018-05-18T22:06:15Z

SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test.

commit 82591e63bf30fad37b90956c226101e428d39787
Author: Eyal Farago <eyal@...>
Date:   2018-05-18T22:21:33Z

SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by 
removing the reference to the initial iterator.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

2018-02-04 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19054#discussion_r165861581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 operator.withNewChildren(children)
   }
 
+  private def isSubset(biggerSet: Seq[Expression], smallerSet: 
Seq[Expression]): Boolean =
+smallerSet.length <= biggerSet.length &&
+  smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
+
+  /**
+   * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` 
to be a prefix of
+   * `expectedOrderOfKeys`
+   */
   private def reorder(
   leftKeys: Seq[Expression],
   rightKeys: Seq[Expression],
-  expectedOrderOfKeys: Seq[Expression],
-  currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
-val leftKeysBuffer = ArrayBuffer[Expression]()
-val rightKeysBuffer = ArrayBuffer[Expression]()
+  expectedOrderOfKeys: Seq[Expression], // comes from child's output 
partitioning
+  currentOrderOfKeys: Seq[Expression]): // comes from join predicate
+  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
+
+assert(leftKeys.length == rightKeys.length)
+
+val allLeftKeys = ArrayBuffer[Expression]()
+val allRightKeys = ArrayBuffer[Expression]()
+val reorderedLeftKeys = ArrayBuffer[Expression]()
+val reorderedRightKeys = ArrayBuffer[Expression]()
+
+// Tracking indicies here to track to which keys are accounted. Using 
a set based approach
+// won't work because its possible that some keys are repeated in the 
join clause
+// eg. a.key1 = b.key1 AND a.key1 = b.key2
+val processedIndicies = mutable.Set[Int]()
 
 expectedOrderOfKeys.foreach(expression => {
-  val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
-  leftKeysBuffer.append(leftKeys(index))
-  rightKeysBuffer.append(rightKeys(index))
+  val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) 
=>
+!processedIndicies.contains(i) && 
currKey.semanticEquals(expression)
+  }.get._2
--- End diff --

is the find guaranteed to always succeed?
if so, worth a comment on method's pre/post conditions.

a getOrElse(sys error "...") might also be a good way of documenting this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...

2018-02-04 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19054#discussion_r165860433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 operator.withNewChildren(children)
   }
 
+  private def isSubset(biggerSet: Seq[Expression], smallerSet: 
Seq[Expression]): Boolean =
+smallerSet.length <= biggerSet.length &&
+  smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x)))
+
+  /**
+   * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` 
to be a prefix of
+   * `expectedOrderOfKeys`
+   */
   private def reorder(
   leftKeys: Seq[Expression],
   rightKeys: Seq[Expression],
-  expectedOrderOfKeys: Seq[Expression],
-  currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
-val leftKeysBuffer = ArrayBuffer[Expression]()
-val rightKeysBuffer = ArrayBuffer[Expression]()
+  expectedOrderOfKeys: Seq[Expression], // comes from child's output 
partitioning
+  currentOrderOfKeys: Seq[Expression]): // comes from join predicate
+  (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = {
--- End diff --

can you please add a comment describing the return type? a tuple4 is not 
such a descriptive type :smiley: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spil...

2017-10-13 Thread eyalfa
Github user eyalfa closed the pull request at:

https://github.com/apache/spark/pull/19481


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spill

2017-10-12 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19481
  
@hvanhovell , @juliuszsompolski


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spil...

2017-10-12 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/19481

[SPARK-21907][CORE][BACKPORT 2.2] oom during spill

back-port #19181 to branch-2.2.

1. a test reproducing 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` 
calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` 
which may trigger another spill,
when this happens the `array` member is already de-allocated but still 
referenced by the code, this causes the nested spill to fail with an NPE in 
`org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix 
simply sets the in-mem sorter's array member to an empty array before actually 
performing the allocation. This prevents the spilling code from 'touching' the 
de-allocated array.

introduced a new test case: 
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.

Author: Eyal Farago <e...@nrgene.com>

Closes #19181 from eyalfa/SPARK-21907__oom_during_spill.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark 
SPARK-21907__oom_during_spill__BACKPORT-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19481


commit 9b6c5d2672a628951a6f35802dd569ea91a544a7
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-10-10T20:49:47Z

[SPARK-21907][CORE] oom during spill

1. a test reproducing 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` 
calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` 
which may trigger another spill,
when this happens the `array` member is already de-allocated but still 
referenced by the code, this causes the nested spill to fail with an NPE in 
`org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix 
simply sets the in-mem sorter's array member to an empty array before actually 
performing the allocation. This prevents the spilling code from 'touching' the 
de-allocated array.

introduced a new test case: 
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.

Author: Eyal Farago <e...@nrgene.com>

Closes #19181 from eyalfa/SPARK-21907__oom_during_spill.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@hvanhovell , thanks :+1: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-30 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r142005517
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
--- End diff --

well, its a tricky one...
as far as I can track the construction and logic of the relevant classes 
here, and given that we're sorting Ints here, we're not supposed to spill 
before expandPointersArray requests additional memory, it is however very 
difficult and 'involved' to actually check this during test.
I can set the memoryManager to choke **before** the first loop, this way if 
my assumption breaks and the sorter attempts to allocate memory we'd get an OOM 
sooner than expected, effectively failing the test.

@juliuszsompolski , do you find this approach better? it'd still need a 
comment describing the assumption about memory usage by the sorter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-26 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@hvanhovell  ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-20 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@hvanhovell , PTAL
let me now if there's anything that requires fixing here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-18 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r139594335
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
--- End diff --

according to 
[Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81782/testReport/org.apache.spark.util.collection.unsafe.sort/UnsafeExternalSorterSuite/testOOMDuringSpill/):
 7ms, I guess we're ok than :sunglasses: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-15 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
I agree, but 'it is what it is'😎
We can probably come up with some mechanism that detects such scenarios and
avoids invoking the spill method on an object whose already 'on the stack',
I think this requires more involvement from the committers, perhaps in a
separate Jira?

On Sep 15, 2017 14:27, "Juliusz Sompolski" <notificati...@github.com> wrote:
    
> @eyalfa <https://github.com/eyalfa> Thanks. I agree that this is a good
> fix to this issue and lgtm.
> I'm just worried that there are more lurking cases where a nested spill
> can trigger and cause something unexpected, and that finding and
> reproducing such other similar issues may be difficult.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/19181#issuecomment-32971>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABFFOSIoc8O29CLcpfn2hPWJV_5neQjqks5sil8WgaJpZM4PSVvS>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-14 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@juliuszsompolski,
if you comment the few added lines in the reset() method, you'd see that 
the test fails with a stack frame very similar to the one you pasted in JIRA.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-14 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@hvanhovell,  @juliuszsompolski,
failure seem unrelated to my work, can you please request a retest?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138751839
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,47 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Rule
+  public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter;
+for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+exceptions.expect(OutOfMemoryError.class);
--- End diff --

I'm used to scalatest's Matchers, things could be so much easier if this 
test was written in Scala...
Will rewrite...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-13 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138751518
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
@@ -85,7 +85,7 @@
   private final LinkedList spillWriters = new 
LinkedList<>();
 
   // These variables are reset after spilling:
-  @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+  @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter;
--- End diff --

@hvanhovell , is it acceptable to a method indicating if the next inset is 
going to spill? Can make this @VisibleForTest and prefixed with something like 
testOnly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-12 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r138373142
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -170,6 +170,10 @@ public void free() {
   public void reset() {
 if (consumer != null) {
   consumer.freeArray(array);
+  array = LongArray.empty;
--- End diff --

@hvanhovell ,
I'm starting to have second thoughts about the special `empty` instance 
here, I'm afraid the the nested call might trigger `freeArray` or something 
similar on it.
perhaps using null is a better option here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-09-10 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/19181
  
@ericl , @davies ,
you guys seem to be the last ones to edit this area of the code, I'd 
appreciate if you could take a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-10 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/19181

[SPARK-21907][CORE]  oom during spill

## What changes were proposed in this pull request?
1. a test reproducing 
[SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.

`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` 
calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` 
which may trigger another spill,
when this happens the `array` member is already de-allocated but still 
referenced by the code, this causes the nested spill to fail with an NPE in 
`org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix 
simply sets the in-mem sorter's array member to an empty array before actually 
performing the allocation. This prevents the spilling code from 'touching' the 
de-allocated array.

## How was this patch tested?
introduced a new test case: 
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark SPARK-21907__oom_during_spill

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19181


commit c9cbe1a3e3e794e9c1ee54d498301c3f332c3f6c
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-09-10T10:05:06Z

SPARK-21907__oom_during_spill: introduce a reproducing test.

commit cc8ccfd3f3956a7652ec82e9748ec56609b19800
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-09-10T14:29:32Z

SPARK-21907__oom_during_spill: fix the root cause of this bug, improve test 
by requiring OOM exception thrown from the reset() method.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-16 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@cloud-fan , @vanzin ,
any idea what happened to this build? seem environment issue after a 
successful build (0 failed tests, 'Build step 'Execute shell' marked build as 
failure')
can one of you kindly ask jenkins to retest?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-15 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
Funny enough, that's the approach I've chosen.

On Aug 15, 2017 19:17, "Marcelo Vanzin" <notificati...@github.com> wrote:

> *@vanzin* commented on this pull request.
> --
>
> In core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
> <https://github.com/apache/spark/pull/18855#discussion_r133232857>:
>
> > @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite 
with Matchers with BeforeAndAfterE
>super.fetchBlockSync(host, port, execId, blockId)
>  }
>}
> +
> +  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
>
> It would be probably easier to propagate the chunk size as a SparkConf
> entry that is not documented. But up to you guys.
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18855#discussion_r133232857>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABFFOZXLge3svlsrRWDJnt_xImYb6cTjks5sYcSVgaJpZM4OukZQ>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133180495
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

yes, currently working on:
1.  parameterizing `DiskStore` and `DiskStoreSuite`
2. revert the tests in `BlockManagerSuite` 
3. revert the 6gb  change in sbt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133144224
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@cloud-fan , @vanzin ,
taking the 'parameterized approach', I'd remove most of the tests from 
`BlockManagerSuite` as they'd require propagating this parameter to too many 
subsystems.
so, I'm going to modify `DiskStore` and `DiskStoreSuite` to use such a 
parameter, I'm not sure about leaving a test-case in `BlockManagerSuite` that 
tests `DISK_ONLY` persistence, what do you guys think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133141988
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@cloud-fan  I know, 
it even gets worse when using the `===` operator.

I'm currently exploring the second direction pointed by @vanzin , 
introducing a test-only configuration key to configure the max page size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-15 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r133135659
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) {
--- End diff --

@vanzin , 
I've measured, test cases times range from 7-25 seconds on my laptop.
point well taken :sunglasses: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-14 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r132978316
  
--- Diff: project/SparkBuild.scala ---
@@ -790,7 +790,7 @@ object TestSettings {
 javaOptions in Test ++= 
System.getProperties.asScala.filter(_._1.startsWith("spark"))
   .map { case (k,v) => s"-D$k=$v" }.toSeq,
 javaOptions in Test += "-ea",
-javaOptions in Test ++= "-Xmx3g -Xss4096k"
+javaOptions in Test ++= "-Xmx6g -Xss4096k"
--- End diff --

@cloud-fan , let's wait few hours and see what the other guys CCed for this 
(the last ones to edit the build) have to say about this. if they are also 
worried or do not comment I'll revert this.

I must say I'm reluctant to revert these tests as I personally believe that 
lack of such tests contributed to spark's 2GB issues, including this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-11 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@cloud-fan I think I found the sbt setting that controlled max heap size 
for forked tests, I've increased it from 3g to 6g.
cc: @srowen, @vanzin and @a-roberts  you guys seem to be the last ones to 
update this area in sbt, please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-09 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
Agreed, though I can't really understand it...
Ran SBT locally with --mem 12000 and provided the magical jvm flag that
prints jvm provided CLI args, it seems SBT is indeed running with 12gb and
the test is still failing - even when running this test only.

Does SBT fork before executing tests? If so how does it configure the jvm
running the tests?

On Aug 9, 2017 08:29, "Wenchen Fan" <notificati...@github.com> wrote:

> since this PR only focus on DiskStore, shall we remove the new tests in
> BlockManagerSuite? Seems the OOM only happens in BlockManagerSuite
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18855#issuecomment-321156406>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABFFOZ8ahiQXd_dce4M6UG-Wib8ebaBTks5sWUOmgaJpZM4OukZQ>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-08 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@cloud-fan, any idea how much memory is allocated for running sbt? 
currently one of my newly introduced tests fails on OOM during kryo 
serialization...

it's actually a bit weird as this is the test that stores it in memory 
deserialized objects, notice that the test doesn't really generate a true 2GB 
large dataset as the iterator returns the same array on every `next()` call, so 
basically the underlying buffer(actually buffers as it used chunked byte 
buffer) used by kryo is the first materialization of such a large piece of data.
sbt execution of this suite seems to stop on the first error so I can't 
tell if the 'memory deserialized' version of this test would have passed.

please advice, is it possible to increase sbt's process heap size? or 
should I remove/comment/ignore these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...

2017-08-08 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@cloud-fan , can you please give the 'ok to test'?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131914883
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+val mb = 1024 * 1024
+val gb = 1024L * mb
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](mb)
+  for {
+_ <- 0 until 2048
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
+assert(blockData.size == 2 * gb)
--- End diff --

possible, will fix.
I guess I aimed for the lowest possible failing value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131913940
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getResult = store.get(RDDBlockId(42, 0))
+withClue(getResult) {
+  assert(getResult.isDefined)
+  assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+val getBlockRes = store.getBlockData(RDDBlockId(42, 0))
+withClue(getBlockRes) {
+  try {
+assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024)
+Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm =>
+  val iter = store
+.serializerManager
+.dataDeserializeStream(RDDBlockId(42, 0)
+  , inpStrm)(implicitly[ClassTag[Array[Byte]]])
+  assert(iter.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
+  a != null &&
+b != null &&
+a.asInstanceOf[Array[Byte]].seq == 
b.asInstanceOf[Array[Byte]].seq
+  })
+}
+  } finally {
+getBlockRes.release()
+  }
+}
+  }
+
+  test("getOrElseUpdate > 2gb, storage level = disk only") {
--- End diff --

these tests cover more than just the `DiskOnly` storage level, they were 
crafted when I had bigger ambitions of solving the entire 2GB issue 
:sunglasses: , that was before seeing some ~100 files pull requests being 
abandoned or rejected.
aside, these tests also test the entire orchestration done by 
`BlockManager` when an `RDD` requests a cached partition, notice that these 
tests intentionally makes two calls to the BlockManager in order to simulate 
both code paths (cache-hit, cache-miss).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131912886
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
+  assert(res1.isLeft)
+  assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall {
+case (a, b) =>
--- End diff --

can't compare Arrays, you get identity equality which is usually not what 
you want. hence the `.seq` that forces it to be wrapped with a Seq


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-08 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131912438
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
@@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
   super.fetchBlockSync(host, port, execId, blockId)
 }
   }
+
+  def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) {
+store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1")
+def mkBlobs() = {
+  val rng = new java.util.Random(42)
+  val buff = new Array[Byte](1024 * 1024)
+  rng.nextBytes(buff)
+  Iterator.fill(2 * 1024 + 1) {
+buff
+  }
+}
+val res1 = store.getOrElseUpdate(
+  RDDBlockId(42, 0),
+  storageLevel,
+  implicitly[ClassTag[Array[Byte]]],
+  mkBlobs _
+)
+withClue(res1) {
--- End diff --

I think it'd print an Either where left side is a case class with members: 
iterator (prints as empty/non empty iterator), an enum and number of bytes. 
right side is an iterator, again this'd print an empty/not-empty iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...

2017-08-07 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131758308
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
--- End diff --

@cloud-fan 
it took me roughly 4 hours, but I looked both at the shuffle cod path and 
at `BlockManager.getRemoteBytes`:
it seems the first is robust to large blocks by using Netty's stream 
capabilities,
the later seems to be broken as it's not using the Netty's streaming 
capabilities and actually tries to copy the result buffer into a heap based 
buffer. I think this deserves its own JIRA/PR.
I think these two places plus the external shuffle server cover most of the 
relevant use cases (aside from local caching which i believe this PR completes 
in terms of being 2GB  proof).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...

2017-08-07 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131713763
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite {
 assert(diskStore.getSize(blockId) === 0L)
   }
 
+  test("blocks larger than 2gb") {
+val conf = new SparkConf()
+val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = 
true)
+val diskStore = new DiskStore(conf, diskBlockManager, new 
SecurityManager(conf))
+
+val mb = 1024 * 1024
+val gb = 1024L * mb
+
+val blockId = BlockId("rdd_1_2")
+diskStore.put(blockId) { chan =>
+  val arr = new Array[Byte](mb)
+  for {
+_ <- 0 until 2048
+  } {
+val buf = ByteBuffer.wrap(arr)
+while (buf.hasRemaining()) {
+  chan.write(buf)
+}
+  }
+}
+
+val blockData = diskStore.getBytes(blockId)
--- End diff --

@kiszk, this is the test case I was referring to.
I actually introduced it prior to actually (hopefully) fixing the bug in 
`DiskStore.getBytes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...

2017-08-07 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131712834
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
--- End diff --

indeed.
I chose to postpone the failure from `DiskStroe.getBytes` to this place as 
I believe it introduces no regression while still allowing the more common 
'streamin' like use-case.

further more, I think this plays well with the comment about future 
deprecation of `org.apache.spark.network.buffer.ManagedBuffer#nioByteBuffer` 
which seems to be the main reason for `BlockData` exposing the `toByteBuffer` 
method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fails for...

2017-08-07 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@HyukjinKwon , interesting reading but I couldn't find a concrete reason or 
solution to the issue.
@cloud-fan, I've encountered this bug when working with a disk persisted 
RDD, it turned out some of our partitions exceeded the 2GB limit and failed on 
`DiskStore.getBytes`.

since we're using a specific commercial distro I couldn't test this patch 
on my use case (we got rid of the large partitions anyway by tuning number of 
partitions and tweaking the DAG altogether :sunglasses: ), so I did the next 
best thing: reproduce the issue in a test case, please see test "blocks larger 
than 2gb" in `DiskStoreSuite` inthis PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fails for...

2017-08-06 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@kiszk , fixed styling+readability according to your comments.
BTW, any idea why JIRA didn't associate this PR with SPARK-3151?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...

2017-08-06 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131543591
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+require( size < Int.MaxValue
--- End diff --

@kiszk , not sure I'm following your comment.
this requirement results with an explicit errors when one tries to obtain a 
ByteBuffer larger than java.nio's limitations.
original code used to fail in line 115 when calling `ByteBuffer.allocate` 
with block size larger than 2GB, newer code fails explicitly in this cae.

...or do you mean refer the `blockSize` val rather than the `size` method? 
can't really see a difference in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...

2017-08-06 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/18855#discussion_r131543602
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -165,6 +147,62 @@ private[spark] class DiskStore(
 
 }
 
+private class DiskBlockData(
+conf: SparkConf,
+file: File,
+blockSize: Long) extends BlockData {
+
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+
+  override def toInputStream(): InputStream = new FileInputStream(file)
+
+  /**
+  * Returns a Netty-friendly wrapper for the block's data.
+  *
+  * Please see `ManagedBuffer.convertToNetty()` for more details.
+  */
+  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
+
+  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): 
ChunkedByteBuffer = {
+Utils.tryWithResource(open()) { channel =>
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = allocator(chunkSize.toInt)
+remaining -= chunkSize
+JavaUtils.readFully(channel, chunk)
+chunk.flip()
+chunks += chunk
+  }
+  new ChunkedByteBuffer(chunks.toArray)
+}
+  }
+
+  override def toByteBuffer(): ByteBuffer = {
+require( size < Int.MaxValue
+  , s"can't create a byte buffer of size $blockSize"
++ s" since it exceeds Int.MaxValue ${Int.MaxValue}.")
+Utils.tryWithResource(open()) { channel =>
+  if (blockSize < minMemoryMapBytes) {
+// For small files, directly read rather than memory map.
+val buf = ByteBuffer.allocate(blockSize.toInt)
+JavaUtils.readFully(channel, buf)
+buf.flip()
+buf
+  } else {
+channel.map(MapMode.READ_ONLY, 0, file.length)
+  }
+}
+  }
+
+  override def size: Long = blockSize
+
+  override def dispose(): Unit = {}
+
+private def open() = new FileInputStream(file).getChannel
--- End diff --

will do :sunglasses: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18855: [Spark 3151][Block Manager] DiskStore.getBytes fails for...

2017-08-05 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/18855
  
@rxin, @JoshRosen , @cloud-fan ,
you seem to be the last guys to touch this class, can you please review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18855: [Spark 3151][Block Manager] DiskStore.getBytes fa...

2017-08-05 Thread eyalfa
GitHub user eyalfa opened a pull request:

https://github.com/apache/spark/pull/18855

[Spark 3151][Block Manager] DiskStore.getBytes fails for files larger than 
2GB

## What changes were proposed in this pull request?
introduced `DiskBlockData`, a new implementation of `BlockData` 
representing a whole file.
this is somehow related to 
[SPARK-6236](https://issues.apache.org/jira/browse/SPARK-6236) as well

This class follows the implementation of `EncryptedBlockData` just without 
the encryption. hence:
* it uses FileOutputStream (todo: encrypted version actually uses 
`Channels.newInputStream`, not sure if it's the right choice for this)
* `toNetty` is implemented in terms of 
`io.netty.channel.DefaultFileRegion#DefaultFileRegion`
* `toByteBuffer` fails for files larger than 2GB (same behavior of the 
original code, just postponed a bit), it also respects the same configuration 
keys defined by the original code to choose between memory mapping and simple 
file read.
(Please fill in changes proposed in this fix)

## How was this patch tested?
added test to DiskStoreSuite and MemoryManagerSuite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eyalfa/spark SPARK-3151

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18855


commit fc3f1d78e14a30dd2f71fc65ec59a2def5c1a0d4
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-07-05T13:20:16Z

SPARK-6235__take1: introduce a failing test.

commit 84687380026a6a3bcded27be517094d3f690c3bb
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-07-30T20:06:05Z

SPARK-6235__add_failing_tests: add failing tests for block manager suite.

commit 15804497a477b8f97c08adfad5f0519504dc82f2
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-08-01T17:34:26Z

SPARK-6235__add_failing_tests: introduce a new BlockData implementation to 
represent a disk backed block data.

commit c5028f50698c4fe48a06f5dd683dbee42f7e6b2b
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-08-05T19:57:41Z

SPARK-6235__add_failing_tests: styling

commit 908c7860688534d0bb77bcbebbd2e006a161fb74
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-08-05T19:58:52Z

SPARK-6235__add_failing_tests: adapt DiskStoreSuite to the modifications in 
the tested class.

commit 67f4259ca16c3ca7c904c9ccc5de9acbc25d2271
Author: Eyal Farago <e...@nrgene.com>
Date:   2017-08-05T20:57:58Z

SPARK-6235__add_failing_tests: try to reduce actual memory footprint of the 
>2gb tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-02-04 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r99475332
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+/**
+* SPARK-18601 discusses simplification direct access to complex types 
creators.
+* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be 
simplified to {{{`x` * `x`}}}.
+* sam applies to create_array and create_map
+*/
+class ComplexTypesSuite extends PlanTest{
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("collapse projections", FixedPoint(10),
+  CollapseProject) ::
+  Batch("Constant Folding", FixedPoint(10),
+  NullPropagation(conf),
+  ConstantFolding,
+  BooleanSimplification,
+  SimplifyConditionals,
+  SimplifyBinaryComparison,
+  SimplifyCreateStructOps,
+  SimplifyCreateArrayOps,
+  SimplifyCreateMapOps) :: Nil
+  }
+
+  val idAtt = ('id).long.notNull
+
+  lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil)
+
+  val idRef = baseOptimizedPlan.output.head
+
+  implicit class ComplexTypeDslSupport(e : Expression) {
+def getStructField(f : String): GetStructField = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[StructType])
+  val structType = e.dataType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetStructField(e, ord, Some(f))
+}
+
+def getArrayStructField(f : String) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  val arrType = e.dataType.asInstanceOf[ArrayType]
+  assert(arrType.elementType.isInstanceOf[StructType])
+  val structType = arrType.elementType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetArrayStructFields(e, structType(ord), ord, 1, 
arrType.containsNull)
+}
+
+def getArrayItem(i : Int) : GetArrayItem = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  GetArrayItem(e, Literal(i))
+}
+
+def getMapValue(k : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[MapType])
+  val mapType = e.dataType.asInstanceOf[MapType]
+  assert(k.dataType == mapType.keyType)
+  GetMapValue(e, k)
+}
+
+def addCaseWhen( cond : Expression, v : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.isInstanceOf[CaseWhen])
+  assert(cond.dataType == BooleanType)
+  assert(v.dataType == e.dataType)
+  val CaseWhen(branches, default) = e
+  CaseWhen( branches :+ (cond, v), default)
+}
+  }
+
+  test("explicit") {
+val rel = baseOptimizedPlan.select(
+  CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as 
"outerAtt"
+   )
+
+assertResult(StructType(StructField("outerAtt", LongType, nullable = 
false) :: Nil))(rel.schema)
+
+val optimized = Optimize execute rel
+
+  

[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-02-04 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r99475106
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+/**
+* SPARK-18601 discusses simplification direct access to complex types 
creators.
+* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be 
simplified to {{{`x` * `x`}}}.
+* sam applies to create_array and create_map
+*/
+class ComplexTypesSuite extends PlanTest{
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("collapse projections", FixedPoint(10),
+  CollapseProject) ::
+  Batch("Constant Folding", FixedPoint(10),
+  NullPropagation(conf),
+  ConstantFolding,
+  BooleanSimplification,
+  SimplifyConditionals,
+  SimplifyBinaryComparison,
+  SimplifyCreateStructOps,
+  SimplifyCreateArrayOps,
+  SimplifyCreateMapOps) :: Nil
+  }
+
+  val idAtt = ('id).long.notNull
+
+  lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil)
+
+  val idRef = baseOptimizedPlan.output.head
+
+  implicit class ComplexTypeDslSupport(e : Expression) {
+def getStructField(f : String): GetStructField = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[StructType])
+  val structType = e.dataType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetStructField(e, ord, Some(f))
+}
+
+def getArrayStructField(f : String) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  val arrType = e.dataType.asInstanceOf[ArrayType]
+  assert(arrType.elementType.isInstanceOf[StructType])
+  val structType = arrType.elementType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetArrayStructFields(e, structType(ord), ord, 1, 
arrType.containsNull)
+}
+
+def getArrayItem(i : Int) : GetArrayItem = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  GetArrayItem(e, Literal(i))
+}
+
+def getMapValue(k : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[MapType])
+  val mapType = e.dataType.asInstanceOf[MapType]
+  assert(k.dataType == mapType.keyType)
+  GetMapValue(e, k)
+}
+
+def addCaseWhen( cond : Expression, v : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.isInstanceOf[CaseWhen])
+  assert(cond.dataType == BooleanType)
+  assert(v.dataType == e.dataType)
+  val CaseWhen(branches, default) = e
+  CaseWhen( branches :+ (cond, v), default)
+}
+  }
+
+  test("explicit") {
+val rel = baseOptimizedPlan.select(
+  CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as 
"outerAtt"
+   )
+
+assertResult(StructType(StructField("outerAtt", LongType, nullable = 
false) :: Nil))(rel.schema)
+
+val optimized = Optimize execute rel
+
+  

[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-02-04 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r99475096
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+/**
+* SPARK-18601 discusses simplification direct access to complex types 
creators.
+* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be 
simplified to {{{`x` * `x`}}}.
+* sam applies to create_array and create_map
+*/
+class ComplexTypesSuite extends PlanTest{
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("collapse projections", FixedPoint(10),
+  CollapseProject) ::
+  Batch("Constant Folding", FixedPoint(10),
+  NullPropagation(conf),
+  ConstantFolding,
+  BooleanSimplification,
+  SimplifyConditionals,
+  SimplifyBinaryComparison,
+  SimplifyCreateStructOps,
+  SimplifyCreateArrayOps,
+  SimplifyCreateMapOps) :: Nil
+  }
+
+  val idAtt = ('id).long.notNull
+
+  lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil)
+
+  val idRef = baseOptimizedPlan.output.head
+
+  implicit class ComplexTypeDslSupport(e : Expression) {
+def getStructField(f : String): GetStructField = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[StructType])
+  val structType = e.dataType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetStructField(e, ord, Some(f))
+}
+
+def getArrayStructField(f : String) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  val arrType = e.dataType.asInstanceOf[ArrayType]
+  assert(arrType.elementType.isInstanceOf[StructType])
+  val structType = arrType.elementType.asInstanceOf[StructType]
+  val ord = structType.fieldNames.indexOf(f)
+  assert(-1 != ord)
+  GetArrayStructFields(e, structType(ord), ord, 1, 
arrType.containsNull)
+}
+
+def getArrayItem(i : Int) : GetArrayItem = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[ArrayType])
+  GetArrayItem(e, Literal(i))
+}
+
+def getMapValue(k : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.dataType.isInstanceOf[MapType])
+  val mapType = e.dataType.asInstanceOf[MapType]
+  assert(k.dataType == mapType.keyType)
+  GetMapValue(e, k)
+}
+
+def addCaseWhen( cond : Expression, v : Expression) : Expression = {
+  assert(e.resolved)
+  assert(e.isInstanceOf[CaseWhen])
+  assert(cond.dataType == BooleanType)
+  assert(v.dataType == e.dataType)
+  val CaseWhen(branches, default) = e
+  CaseWhen( branches :+ (cond, v), default)
+}
+  }
+
+  test("explicit") {
+val rel = baseOptimizedPlan.select(
+  CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as 
"outerAtt"
+   )
+
+assertResult(StructType(StructField("outerAtt", LongType, nullable = 
false) :: Nil))(rel.schema)
+
+val optimized = Optimize execute rel
+
+  

[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...

2017-01-31 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/16043
  
@HyukjinKwon, @hvanhovell, are you familiar with this build failure? seems 
to be unrelated to my specific build...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-31 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98660310
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field extraction
+  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+// instead f selecting the field on the entire array,
+// select it from each member of the array.
+// pushing down the operation this way open other optimizations 
opportunities
+// (i.e. struct(...,x,...).x)
+CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+// instead of creating the array and then selecting one row,
+// remove array creation altgether.
+if (idx >= 0 && idx < elems.size) {
+  // valid index
+  elems(idx)
+} else {
+  // out of bounds, mimic the runtime behavior and return null
+  Cast(Literal(null), ga.dataType)
--- End diff --

yep


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-31 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98660085
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -293,6 +293,12 @@ object SimplifyConditionals extends Rule[LogicalPlan] 
with PredicateHelper {
 // from that. Note that CaseWhen.branches should never be empty, 
and as a result the
 // headOption (rather than head) added above is just an extra (and 
unnecessary) safeguard.
 branches.head._2
+
+  case e @ CaseWhen(branches, _) if branches.exists(_._1 == 
Literal(true)) =>
+// a branc with a TRue condition eliminates all following branches,
+// these branches can be pruned away
+val (h, t) = branches.span(_._1 != Literal(true))
+CaseWhen( h :+ t.head, None)
--- End diff --

sorry, please explain


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...

2017-01-31 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/16043
  
@hvanhovell can you figure out what fail the build? seems all tests passed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...

2017-01-27 Thread eyalfa
Github user eyalfa commented on the issue:

https://github.com/apache/spark/pull/16043
  
@hvanhovel, currently in the process of refactoring,luckily I've introduced
some tests in the initial sprint so I've cought my mistakes and refreshed
my mind around the assumptions this code makes.

Regarding nulls,I'm using coalesce to add the first positive match after
undetermined ones, when one of the undetermined values is null,coalesce
will override it with the positive match's value.
This was handled in the normalize method of class ClassifiefKeys,part of
the refactoring involves removing this class and simplifying the decision
making around matches.

Will push later today after running scalastyle on my branch.

On Jan 27, 2017 18:40, "Herman van Hovell" <notificati...@github.com> wrote:

> *@hvanhovell* commented on this pull request.
> --
>
> In sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> optimizer/ComplexTypes.scala <https://github.com/apache/spark/pull/16043>:
>
> > +}
> +}
> +val res = res1.normalize( requestedKey )
> +res
> +  }
> +
> +  override def apply(plan: LogicalPlan): LogicalPlan = {
> +plan.transformExpressionsUp {
> +  // attempt to unfold 'constant' key extraction,
> +  // this enables other optimizations to take place.
> +  case gmv @ GetMapValue(cm @ CreateMap(elems), key) =>
> +val kvs = cm.keys.zip(cm.values)
> +val classifiedEntries = classifyEntries(kvs, key)
> +classifiedEntries match {
> +  case ClassifiedEntries(Seq(), _, None) => Literal.create(null, 
gmv.dataType)
> +  case ClassifiedEntries(`elems`, _, None) => gmv
>
> It is fine, it just needs a little clarification.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/16043>, or mute the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABFFOV2D--1Wea5bmhkZVNwjmAZXxrtLks5rWh3lgaJpZM4K-Obo>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98222460
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field extraction
+  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+if (idx >= 0 && idx < elems.size) {
+  elems(idx)
+} else {
+  Cast(Literal(null), ga.dataType)
+}
+}
+  }
+}
+
+/**
+* push down operations into [[CreateMap]].
+*/
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  object ComparisonResult extends Enumeration {
+val PositiveMatch = Value
+val NegativeMatch = Value
+val UnDetermined = Value
+  }
+
+  def compareKeys(k1 : Expression, k2 : Expression) : 
ComparisonResult.Value = {
+(k1, k2) match {
+  case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch
+  // make surethis is null safe, especially when datatypes differ
+  // is this even possible?
+  case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch
+  case _ => ComparisonResult.UnDetermined
+}
+  }
+
+  case class ClassifiedEntries(
+undetermined : Seq[Expression],
+nullable : Boolean,
+firstPositive : Option[Expression]) {
+def normalize(k : Expression) : ClassifiedEntries = this match {
+  /**
+  * when we have undetermined matches that might bproduce a null value,
+  * we can't separate a positive match and use [[Coalesce]] to choose 
the final result.
+  * so we 'hide' the positive match as an undetermined match.
+  */
+  case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty =>
+ClassifiedEntries(u ++ Seq(k, p), true, None)
+  case _ => this
+}
+  }
+
+  def classifyEntries(mapEntries : Seq[(Expression, Expression)],
+  requestedKey : Expression) : ClassifiedEntries = {
+val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = 
false, None)) {
+  case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev
+  case (ClassifiedEntries(prev, nullable, None), (k, v)) =>
+compareKeys(k, requestedKey) match {
+  case ComparisonResult.UnDetermined =>
+val vIsNullable = v.nullable
+val nextNullbale = nullable || vIsNullable
+ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, 
None)
+  case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, 
nullable, None)
+  case ComparisonResult.PositiveMatch => Classified

[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98216632
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field extraction
+  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+if (idx >= 0 && idx < elems.size) {
+  elems(idx)
+} else {
+  Cast(Literal(null), ga.dataType)
+}
+}
+  }
+}
+
+/**
+* push down operations into [[CreateMap]].
+*/
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  object ComparisonResult extends Enumeration {
+val PositiveMatch = Value
+val NegativeMatch = Value
+val UnDetermined = Value
+  }
+
+  def compareKeys(k1 : Expression, k2 : Expression) : 
ComparisonResult.Value = {
+(k1, k2) match {
+  case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch
+  // make surethis is null safe, especially when datatypes differ
+  // is this even possible?
+  case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch
+  case _ => ComparisonResult.UnDetermined
+}
+  }
+
+  case class ClassifiedEntries(
+undetermined : Seq[Expression],
+nullable : Boolean,
+firstPositive : Option[Expression]) {
+def normalize(k : Expression) : ClassifiedEntries = this match {
+  /**
+  * when we have undetermined matches that might bproduce a null value,
+  * we can't separate a positive match and use [[Coalesce]] to choose 
the final result.
+  * so we 'hide' the positive match as an undetermined match.
+  */
+  case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty =>
+ClassifiedEntries(u ++ Seq(k, p), true, None)
+  case _ => this
+}
+  }
+
+  def classifyEntries(mapEntries : Seq[(Expression, Expression)],
+  requestedKey : Expression) : ClassifiedEntries = {
+val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = 
false, None)) {
+  case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev
+  case (ClassifiedEntries(prev, nullable, None), (k, v)) =>
+compareKeys(k, requestedKey) match {
+  case ComparisonResult.UnDetermined =>
+val vIsNullable = v.nullable
+val nextNullbale = nullable || vIsNullable
+ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, 
None)
+  case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, 
nullable, None)
+  case ComparisonResult.PositiveMatch => Classified

[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98193617
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field extraction
+  case GetStructField(createNamedStructLike : CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+if (idx >= 0 && idx < elems.size) {
+  elems(idx)
+} else {
+  Cast(Literal(null), ga.dataType)
+}
+}
+  }
+}
+
+/**
+* push down operations into [[CreateMap]].
+*/
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  object ComparisonResult extends Enumeration {
+val PositiveMatch = Value
+val NegativeMatch = Value
+val UnDetermined = Value
+  }
+
+  def compareKeys(k1 : Expression, k2 : Expression) : 
ComparisonResult.Value = {
+(k1, k2) match {
+  case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch
+  // make surethis is null safe, especially when datatypes differ
--- End diff --

I this my comment relates to the case where one literal in an integer while 
the other is a long, not sue if this is possible (as stated in the next 
comment),
but if it is, does null : Int equals null : Long?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98192298
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala
 ---
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.scalatest.Matchers
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, CreateArray, 
CreateMap, CreateNamedStruct, Expression, GetArrayItem, GetArrayStructFields, 
GetMapValue, GetStructField, Literal}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types._
+
+/**
+* Created by eyalf on 11/4/2016.
+* SPARK-18601 discusses simplification direct access to complex types 
creators.
+* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be 
simplified to {{{`x` * `x`}}}.
+* sam applies to create_array and create_map
+*/
+class ComplexTypesSuite extends PlanTest with Matchers{
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+val batches =
+  Batch("collapse projections", FixedPoint(10),
+  CollapseProject) ::
+  Batch("Constant Folding", FixedPoint(10),
+  NullPropagation,
+  ConstantFolding,
+  BooleanSimplification,
+  SimplifyConditionals,
+  SimplifyCreateStructOps,
+  SimplifyCreateArrayOps,
+  SimplifyCreateMapOps) :: Nil
+  }
+
+  val idAtt = ('id).long.notNull
+
+  lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil)
+
+  val idRef = baseOptimizedPlan.output.head
+
+
+//  val idRefColumn = Column("id")
+//  val struct1RefColumn = Column("struct1")
+
+  implicit class ComplexTypeDslSupport(e : Expression) {
+def getStructField(f : String): GetStructField = {
+  e should be ('resolved)
+  e.dataType should be (a[StructType])
--- End diff --

will change to assertion


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98192112
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Cast, Coalesce, 
CreateArray, CreateMap, CreateNamedStructLike, Expression, GetArrayItem, 
GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan]{
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp{
+  // push down field extraction
+  case GetStructField(createNamedStructLike : CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan]{
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp{
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+def getStructField(elem : Expression) = {
+  GetStructField(elem, ordinal, Some(field.name))
+}
+CreateArray(elems.map(getStructField))
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+if (idx >= 0 && idx < elems.size) {
+  elems(idx)
+} else {
+  Cast(Literal(null), ga.dataType)
+}
+}
+  }
+}
+
+/**
+* push down operations into [[CreateMap]].
+*/
+object SimplifyCreateMapOps extends Rule[LogicalPlan]{
+  object ComparisonResult extends Enumeration {
+val PositiveMatch = Value
+val NegativeMatch = Value
+val UnDetermined = Value
+  }
+
+  def compareKeys(k1 : Expression, k2 : Expression) : 
ComparisonResult.Value = {
+(k1, k2) match {
+  case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch
+  // make surethis is null safe, especially when datatypes differ
+  // is this even possible?
+  case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch
+  case _ => ComparisonResult.UnDetermined
+}
+  }
+
+  case class ClassifiedEntries(undetermined : Seq[Expression],
+   nullable : Boolean,
+   firstPositive : Option[Expression]) {
+def normalize( k : Expression ) : ClassifiedEntries = this match {
+  /**
+  * when we have undetermined matches that might bproduce a null value,
+  * we can't separate a positive match and use [[Coalesce]] to choose 
the final result.
+  * so we 'hide' the positive match as an undetermined match.
+  */
+  case ClassifiedEntries( u, true, Some(p)) if u.nonEmpty =>
+ClassifiedEntries(u ++ Seq(k, p), true, None)
+  case _ => this
+}
+  }
+
+  def classifyEntries(mapEntries : Seq[(Expression, Expression)],
+  requestedKey : Expression) : ClassifiedEntries = {
+val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = 
false, None)) {
+  case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev
+  case (ClassifiedEntries(prev, nullable, None), (k, v)) =>
+compareKeys(k, requestedKey) match {
+  case ComparisonResult.UnDetermined =>
+val vIsNullable = v.n

[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...

2017-01-27 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/16043#discussion_r98191785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+/**
+* push down operations into [[CreateNamedStructLike]].
+*/
+object SimplifyCreateStructOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field extraction
+  case GetStructField(createNamedStructLike: CreateNamedStructLike, 
ordinal, _) =>
+createNamedStructLike.valExprs(ordinal)
+}
+  }
+}
+
+/**
+* push down operations into [[CreateArray]].
+*/
+object SimplifyCreateArrayOps extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.transformExpressionsUp {
+  // push down field selection (array of structs)
+  case GetArrayStructFields(CreateArray(elems), field, ordinal, 
numFields, containsNull) =>
+CreateArray(elems.map(GetStructField(_, ordinal, 
Some(field.name
+  // push down item selection.
+  case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) =>
+if (idx >= 0 && idx < elems.size) {
+  elems(idx)
+} else {
+  Cast(Literal(null), ga.dataType)
+}
+}
+  }
+}
+
+/**
+* push down operations into [[CreateMap]].
+*/
+object SimplifyCreateMapOps extends Rule[LogicalPlan] {
+  object ComparisonResult extends Enumeration {
+val PositiveMatch = Value
+val NegativeMatch = Value
+val UnDetermined = Value
+  }
+
+  def compareKeys(k1 : Expression, k2 : Expression) : 
ComparisonResult.Value = {
+(k1, k2) match {
+  case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch
+  // make surethis is null safe, especially when datatypes differ
+  // is this even possible?
+  case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch
+  case _ => ComparisonResult.UnDetermined
+}
+  }
+
+  case class ClassifiedEntries(
+undetermined : Seq[Expression],
+nullable : Boolean,
+firstPositive : Option[Expression]) {
+def normalize(k : Expression) : ClassifiedEntries = this match {
+  /**
+  * when we have undetermined matches that might bproduce a null value,
+  * we can't separate a positive match and use [[Coalesce]] to choose 
the final result.
+  * so we 'hide' the positive match as an undetermined match.
+  */
+  case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty =>
+ClassifiedEntries(u ++ Seq(k, p), true, None)
+  case _ => this
+}
+  }
+
+  def classifyEntries(mapEntries : Seq[(Expression, Expression)],
+  requestedKey : Expression) : ClassifiedEntries = {
+val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = 
false, None)) {
+  case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev
+  case (ClassifiedEntries(prev, nullable, None), (k, v)) =>
+compareKeys(k, requestedKey) match {
+  case ComparisonResult.UnDetermined =>
+val vIsNullable = v.nullable
+val nextNullbale = nullable || vIsNullable
+ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, 
None)
+  case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, 
nullable, None)
+  case ComparisonResult.PositiveMatch => Classified

  1   2   >