[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19810 @maropu , this looks rather cold :sunglasses: , but extremely interesting and relevant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218743585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { +val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) +val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute + SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ => + superGetOrCompute(split, context) +).getOrElse { + val batchIter = superGetOrCompute(split, context) + if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) { +val cachedBatch = batchIter.next() --- End diff -- assert `!batchIter.hasNext`, you expect this partition to contain a single batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218716111 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { --- End diff -- can this be avoided by maintaining two (zipped) RDDs? one of `CachedBatch`s and the other holding only the stats? can this approach avoid the need for a specialized block type for managing metadata? correct me if i'm wrong, but first time the stats are accessed, your approach performs a full scan to extract the stats (happens in the sql code), so having a second RDD which is something like : `batches.map(_.stats).persist` should give the same behavior, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218714493 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { +val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) +val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute + SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ => + superGetOrCompute(split, context) +).getOrElse { + val batchIter = superGetOrCompute(split, context) + if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) { +val cachedBatch = batchIter.next() +SparkEnv.get.blockManager.putSingle(metadataBlockId, cachedBatch.stats, + expectedStorageLevel) +new InterruptibleIterator[CachedBatch](context, Iterator(cachedBatch)) + } else { +batchIter + } +} + } +} + +private[columnar] object CachedColumnarRDD { + + private val rddIdToMetadata = new ConcurrentHashMap[Int, mutable.ArraySeq[Option[InternalRow]]]() --- End diff -- could these be moved to become a member of the RDD class? seems like a map of this->some.property, in this case can be made an instance member. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218705219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( --- End diff -- can this be pulled up out of `usePartitionLevelMetadata` ? seems like you're constructing the predicate per record --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218704390 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -52,6 +52,68 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +private[columnar] class CachedBatchIterator( +rowIterator: Iterator[InternalRow], +output: Seq[Attribute], +batchSize: Int, +useCompression: Boolean, +batchStats: LongAccumulator, +singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] { + + def next(): CachedBatch = { +val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) +}.toArray + +var rowCount = 0 +var totalSize = 0L + +val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow], + rowCount: Int, size: Long) => { + if (!singleBatch) { +rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE + } else { +rowIter.hasNext --- End diff -- doesn't this run the risk of OOM for large partitions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22432: [SPARK-22713][CORE][TEST][FOLLOWUP] Fix flaky Ext...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/22432#discussion_r217902092 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -457,7 +458,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) // assert(map.currentMap == null) -eventually { +eventually(timeout(5 seconds), interval(200 milliseconds)) { --- End diff -- @dongjoon-hyun, thanks for cleaning up my mess! :sunglasses: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22432: [SPARK-22713][CORE][TEST][FOLLOWUP] Fix flaky Ext...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/22432#discussion_r217901988 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -457,7 +458,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala // (lines 69-89) // assert(map.currentMap == null) -eventually { +eventually(timeout(5 seconds), interval(200 milliseconds)) { --- End diff -- I think best practice to encounter this is specifying config patience in sbt's test options. having that said, I'v once had to 'grant' a longer timeout to a specific class so I've achieved this by overriding the `spanScalaeFactor` method `override def spanScaleFactor: Double = super.spanScaleFactor * 3` please notice that there's another usage of `eventually` in line 519, this one 'manually' waits 500 millis before testing which might explain why you didn't see it failing in CI, looking at it now it seems like a bad practice since `eventually` is designed to control both the timeout and the intervals between trying. to summarize: best practice is to control this in sbt's test settings, if needed you can further control it in a specific class, in any case you have to make sure you handle all invocations of `eventually` (which is easier and less error prone by leveraging scalaTest's mechanisms). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17400: [SPARK-19981][SQL] Respect aliases in output part...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/17400#discussion_r211565064 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } + private def updatePartitioningByAliases(exprs: Seq[NamedExpression], partioning: Partitioning) +: Partitioning = { +val aliasSeq = exprs.flatMap(_.collectFirst { --- End diff -- @maropu , I didn't aim for supporting complex partitioning expressions (which deserves its own separate PR), I meant that this code could introduce regressions by 'over-capturing' nested aliases. * my specific example is wrong since struct is transformed into a named struct (alias is replaced by an explicit name). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/17400 thanks @maropu I appreciate this. must say I'm pretty surprised a bug like that lives so long... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/17400 in my use case, I aggregate a dataset, the use select to align columns with a case-class. I later try to join the resulting dataset based on the same columns used for aggregattion. the join introduces shuffles (exchange nodes). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/17400 @maropu , yes it does :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17400: [SPARK-19981][SQL] Update output partitioning inf...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/17400#discussion_r209909106 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } + private def updatePartitioningByAliases(exprs: Seq[NamedExpression], partioning: Partitioning) +: Partitioning = { +val aliasSeq = exprs.flatMap(_.collectFirst { --- End diff -- this might do more than you'd like it to (at least if it behaves the way I understand collect first), i.e. `df.select($"x" as "x1, struct($"a" as "a1", $"b" as "b1") as "s1")` _x1_ and _s1_ are aliases, _a1_ and _b1_ are not. it could even get more complicated if there was an _a1_ alias in the top level projections list. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/17400 @maropu , any reason why this is on hold for so long? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r209499005 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @cloud-fan , do you think this is worth doing, I'm referring to the CompletionIterator delaying GC of the sub iterator and cleanup function (usually a closure referring to a larger collection). if so, I'd open a separate JIRA+PR for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 @hvanhovell ,thanks for picking this up :sunglasses: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r198816624 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r192631230 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan , can we move on with this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191655937 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan, how do you suggest to progress with this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191077231 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan , I've tried the `System.gc` approach with and without `eventually`, and for no avail. seems like scalatest's asserts use some black magic macro based voodoo to generate a local method/val (when I got about 10 classes deep into my quest, I gave up) basically what I'm seeing in visualvm is the expected ref via the `WeakReference` and another surprising one via `UnaryMacroBool.left`. it seems this sneaky ref was generated by the following assertion: `assert(map.currentMap == null)`, but it could have easily been generated elsewhere. @cloud-fan , @gatorsmile , can you please confirm if and how can we import the scala code? otherwise, can you think of an alternative approach for testing this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190829341 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- this requires using something like [scalatest's eventually](http://doc.scalatest.org/1.8/org/scalatest/concurrent/Eventually.html), don't you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 @cloud-fan , i guess we can copy the test util from scala as long as it doesnt violet licensing or credits, please advice on the proper way of doingbthat and where do u want to place the code? Does spark have a testing lib that can be reused? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190542635 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- hmm, we can in line 508 but not in this test. in this test we look at the iterator immediately after a spill, at this point upstream is supposed to be replaced by a `DiskMapIterator`, I guess we can check for this directly (after relaxing its visibility to package private). in line 508, we can simply compare with Iterator.empty --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190375595 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (currentMap I think), so unless we've missed another ref we should be fine. as I wrote above, I think there's a potentially more fundamental issue with `CompletionIterator` which keeps holding references via it's `sub` and `completionFunction` members , these might stall some objects from being collected and can be eliminated upon exhaustion of the iterator. there might be some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think this desrves some more investigation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190372506 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (`currentMap` I think), so unless we've missed another ref we should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190371425 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +private[ExternalAppendOnlyMap] --- End diff -- hmm... the class itself is private (slightly relaxed to package private to ease testing) so I'm not sure what's the benefit in making the method public, in any case I think that once we see the use case for making this method public we'd probably has to further change the iterator/external map classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190370842 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { --- End diff -- :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190370765 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) --- End diff -- the underlying map's iterator is an anonymous class, this is the best I could come up with to check if the upstream iterator holds a ref to the underlying map. @cloud-fan , do you have a better idea (I'm not 100% happy with this one) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189921783 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { --- End diff -- This test was written BEFORE the actual fix and it did fail up untill the fix was in place. I do agree it's a bit clumsy and potential future changes may break the original intention of the test. I've referred a potential testing approach (currently limited to scala's source code) which couldn't be (easily) applied to this code base so I made a best effort to test this. I agree this needs better documentation, I'll start be referring the issue in the test's name and will also add comments to the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189919617 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty --- End diff -- Safer, class remains usable if for some reason hasNext is called again, and this costs absolutely nothing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189919031 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- What behavior does it change? Your suggested codes does exactly the same but is less streamlined and relies on an intermediate value (fortunately it's already a member variable) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 @advancedxy , using jvisualvm+heap dump I could see that the second introduced test case ("drop all references to the underlying map once the iterator is exhausted") eliminated all references to the underlying map: heap contained one instance of `SizeTrackingAppendOnlyMap`, but all references to it where unreachable, hence it was due to be evicted. found one instance of CompletionIterator (actually anonymous class deriving it) which had references to the `SpillableIterator`, direct ref as member 'sub' and another one via member `completionFunction$1`. the `SpillableIterator` had a single ref to the `ExternalAppendOnlyMap` which already had its `currentMap` field already nullified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794281 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = { --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794046 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -305,8 +310,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Input streams are derived both from the in-memory map and spilled maps on disk // The in-memory map is sorted in place, while the spilled maps are already in sorted order -private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator( - currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap()) +private val sortedMap = destructiveIterator( + currentMap.destructiveSortedIterator(keyComparator)) --- End diff -- unfortunately no, scala-style enforces a max of 100 chars per line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189794097 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,24 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +def destroy() : Unit = { --- End diff -- yes, fixing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 well, I took the time trying to figure out how's the iterator is eventually being used, (most of) it boils down to `org.apache.spark.scheduler.ShuffleMapTask#runTask` which does: `writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])` looking at `org.apache.spark.shuffle.ShuffleWriter#write` implementations, it seems all of them first exhaust the iterator and then perform some kind of postprocessing: i.e. merging spills, sorting, writing partitions files and then concatanating them into a single file... bottom line the Iterator may actually be 'sitting' for some time after reaching EOF. I'll implement the 'simple approach' for this PR, but I think this deserves a separate JIRA issue + PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189452094 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @jerrylead, I'd appreciate if you could test this. One more thing that bugs me is that there's another case when the iterator no longer needs the upstream iterator/underlying map but still holds a reference to it: When the iterator reach EOF its hasNext method returns false which causes the wrapping CompletionIterator to call the cleanup function which simply nulls the underlying map member variable in ExternalAppendOnlyMap, the iterator member is not nulled out so we end up with the CompletionIterator holding two paths to the upstrean iterator which leads to the underlying map: first it still holds a reference to the iterator itself, however it still holds a reference to the cleanup closure which refers the ExternalAppendOnlyMap which still refers to the current iterator which refers upstream... This can be solven in one of two ways: Simple way, when creating the completion iterator, provide a closure referring the iterator, not the ExternalAppendOnlyMap. Thorough way, modify completion iterator to null out references after cleaning up. Having that said, I'm not sure how long a completed iterator may be 'sitting' before being discarded so I'm not sure if this is worth fixing, especially using the thorough approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189438351 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- Basically yes, according to my understanding of the code this should have happened on the subsequent hasNext/next call. However according to the analysis in the jira the iterator kept holding this reference, my guess: at this point the entire program started suffering lengthy GC pauses that got it into behaving as if under a deadlock,effectively leaving the ref in place (just a guess) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/21369 @lianhuiwang, @davies, @hvanhovell can you please have a look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/21369 [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during iteration ## What changes were proposed in this pull request? This PR solves [SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) which describes a memory leak that occurs when and ExternalAppendOnlyMap is spilled during iteration (opposed to insertion). (Please fill in changes proposed in this fix) ExternalAppendOnlyMap's iterator supports spilling but it kept a reference to the internal map (via an internal iterator) after spilling, it seems that the original code was actually supposed to 'get rid' of this reference on the next iteration but according to the elaborate investigation described in the JIRA this didn't happen. the fix was simply replacing the internal iterator immediately after spilling. ## How was this patch tested? I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this test asserts that neither the external map itself nor its iterator hold any reference to the internal map after a spill. These approach required some access relaxation of some members variables and nested classes of ExternalAppendOnlyMap, this members are now package provate and annotated with @VisibleForTesting. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-22713__ExternalAppendOnlyMap_effective_spill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21369.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21369 commit 1c4a6af2077e7ac50476101031aa1af99ff4f7b7 Author: Eyal Farago <eyal@...> Date: 2018-05-18T22:06:15Z SPARK-22713__ExternalAppendOnlyMap_effective_spill: add failing test. commit 82591e63bf30fad37b90956c226101e428d39787 Author: Eyal Farago <eyal@...> Date: 2018-05-18T22:21:33Z SPARK-22713__ExternalAppendOnlyMap_effective_spill: fix the issue by removing the reference to the initial iterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19054#discussion_r165861581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { operator.withNewChildren(children) } + private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean = +smallerSet.length <= biggerSet.length && + smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x))) + + /** + * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` to be a prefix of + * `expectedOrderOfKeys` + */ private def reorder( leftKeys: Seq[Expression], rightKeys: Seq[Expression], - expectedOrderOfKeys: Seq[Expression], - currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { -val leftKeysBuffer = ArrayBuffer[Expression]() -val rightKeysBuffer = ArrayBuffer[Expression]() + expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning + currentOrderOfKeys: Seq[Expression]): // comes from join predicate + (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = { + +assert(leftKeys.length == rightKeys.length) + +val allLeftKeys = ArrayBuffer[Expression]() +val allRightKeys = ArrayBuffer[Expression]() +val reorderedLeftKeys = ArrayBuffer[Expression]() +val reorderedRightKeys = ArrayBuffer[Expression]() + +// Tracking indicies here to track to which keys are accounted. Using a set based approach +// won't work because its possible that some keys are repeated in the join clause +// eg. a.key1 = b.key1 AND a.key1 = b.key2 +val processedIndicies = mutable.Set[Int]() expectedOrderOfKeys.foreach(expression => { - val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression)) - leftKeysBuffer.append(leftKeys(index)) - rightKeysBuffer.append(rightKeys(index)) + val index = currentOrderOfKeys.zipWithIndex.find { case (currKey, i) => +!processedIndicies.contains(i) && currKey.semanticEquals(expression) + }.get._2 --- End diff -- is the find guaranteed to always succeed? if so, worth a comment on method's pre/post conditions. a getOrElse(sys error "...") might also be a good way of documenting this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19054: [SPARK-18067] Avoid shuffling child if join keys ...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19054#discussion_r165860433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -220,45 +220,99 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { operator.withNewChildren(children) } + private def isSubset(biggerSet: Seq[Expression], smallerSet: Seq[Expression]): Boolean = +smallerSet.length <= biggerSet.length && + smallerSet.forall(x => biggerSet.exists(_.semanticEquals(x))) + + /** + * Reorders `leftKeys` and `rightKeys` by aligning `currentOrderOfKeys` to be a prefix of + * `expectedOrderOfKeys` + */ private def reorder( leftKeys: Seq[Expression], rightKeys: Seq[Expression], - expectedOrderOfKeys: Seq[Expression], - currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { -val leftKeysBuffer = ArrayBuffer[Expression]() -val rightKeysBuffer = ArrayBuffer[Expression]() + expectedOrderOfKeys: Seq[Expression], // comes from child's output partitioning + currentOrderOfKeys: Seq[Expression]): // comes from join predicate + (Seq[Expression], Seq[Expression], Seq[Expression], Seq[Expression]) = { --- End diff -- can you please add a comment describing the return type? a tuple4 is not such a descriptive type :smiley: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spil...
Github user eyalfa closed the pull request at: https://github.com/apache/spark/pull/19481 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19481 @hvanhovell , @juliuszsompolski --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19481: [SPARK-21907][CORE][BACKPORT 2.2] oom during spil...
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/19481 [SPARK-21907][CORE][BACKPORT 2.2] oom during spill back-port #19181 to branch-2.2. 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. Author: Eyal Farago <e...@nrgene.com> Closes #19181 from eyalfa/SPARK-21907__oom_during_spill. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-21907__oom_during_spill__BACKPORT-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19481 commit 9b6c5d2672a628951a6f35802dd569ea91a544a7 Author: Eyal Farago <e...@nrgene.com> Date: 2017-10-10T20:49:47Z [SPARK-21907][CORE] oom during spill 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. Author: Eyal Farago <e...@nrgene.com> Closes #19181 from eyalfa/SPARK-21907__oom_during_spill. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @hvanhovell , thanks :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r142005517 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); --- End diff -- well, its a tricky one... as far as I can track the construction and logic of the relevant classes here, and given that we're sorting Ints here, we're not supposed to spill before expandPointersArray requests additional memory, it is however very difficult and 'involved' to actually check this during test. I can set the memoryManager to choke **before** the first loop, this way if my assumption breaks and the sorter attempts to allocate memory we'd get an OOM sooner than expected, effectively failing the test. @juliuszsompolski , do you find this approach better? it'd still need a comment describing the assumption about memory usage by the sorter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @hvanhovell ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @hvanhovell , PTAL let me now if there's anything that requires fixing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r139594335 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { --- End diff -- according to [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81782/testReport/org.apache.spark.util.collection.unsafe.sort/UnsafeExternalSorterSuite/testOOMDuringSpill/): 7ms, I guess we're ok than :sunglasses: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 I agree, but 'it is what it is'ð We can probably come up with some mechanism that detects such scenarios and avoids invoking the spill method on an object whose already 'on the stack', I think this requires more involvement from the committers, perhaps in a separate Jira? On Sep 15, 2017 14:27, "Juliusz Sompolski" <notificati...@github.com> wrote: > @eyalfa <https://github.com/eyalfa> Thanks. I agree that this is a good > fix to this issue and lgtm. > I'm just worried that there are more lurking cases where a nested spill > can trigger and cause something unexpected, and that finding and > reproducing such other similar issues may be difficult. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/19181#issuecomment-32971>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/ABFFOSIoc8O29CLcpfn2hPWJV_5neQjqks5sil8WgaJpZM4PSVvS> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @juliuszsompolski, if you comment the few added lines in the reset() method, you'd see that the test fails with a stack frame very similar to the one you pasted in JIRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @hvanhovell, @juliuszsompolski, failure seem unrelated to my work, can you please request a retest? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138751839 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,47 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Rule + public ExpectedException exceptions = ExpectedException.none(); + + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +UnsafeInMemorySorter unsafeInMemSorter = sorter.inMemSorter; +for (int i = 0; unsafeInMemSorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +exceptions.expect(OutOfMemoryError.class); --- End diff -- I'm used to scalatest's Matchers, things could be so much easier if this test was written in Scala... Will rewrite... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138751518 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java --- @@ -85,7 +85,7 @@ private final LinkedList spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private volatile UnsafeInMemorySorter inMemSorter; + @VisibleForTesting @Nullable volatile UnsafeInMemorySorter inMemSorter; --- End diff -- @hvanhovell , is it acceptable to a method indicating if the next inset is going to spill? Can make this @VisibleForTest and prefixed with something like testOnly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r138373142 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -170,6 +170,10 @@ public void free() { public void reset() { if (consumer != null) { consumer.freeArray(array); + array = LongArray.empty; --- End diff -- @hvanhovell , I'm starting to have second thoughts about the special `empty` instance here, I'm afraid the the nested call might trigger `freeArray` or something similar on it. perhaps using null is a better option here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/19181 @ericl , @davies , you guys seem to be the last ones to edit this area of the code, I'd appreciate if you could take a look. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/19181 [SPARK-21907][CORE] oom during spill ## What changes were proposed in this pull request? 1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907) 2. a fix for the root cause of the issue. `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill, when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`. This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array. ## How was this patch tested? introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-21907__oom_during_spill Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19181.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19181 commit c9cbe1a3e3e794e9c1ee54d498301c3f332c3f6c Author: Eyal Farago <e...@nrgene.com> Date: 2017-09-10T10:05:06Z SPARK-21907__oom_during_spill: introduce a reproducing test. commit cc8ccfd3f3956a7652ec82e9748ec56609b19800 Author: Eyal Farago <e...@nrgene.com> Date: 2017-09-10T14:29:32Z SPARK-21907__oom_during_spill: fix the root cause of this bug, improve test by requiring OOM exception thrown from the reset() method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @cloud-fan , @vanzin , any idea what happened to this build? seem environment issue after a successful build (0 failed tests, 'Build step 'Execute shell' marked build as failure') can one of you kindly ask jenkins to retest? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 Funny enough, that's the approach I've chosen. On Aug 15, 2017 19:17, "Marcelo Vanzin" <notificati...@github.com> wrote: > *@vanzin* commented on this pull request. > -- > > In core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala > <https://github.com/apache/spark/pull/18855#discussion_r133232857>: > > > @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE >super.fetchBlockSync(host, port, execId, blockId) > } >} > + > + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { > > It would be probably easier to propagate the chunk size as a SparkConf > entry that is not documented. But up to you guys. > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18855#discussion_r133232857>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/ABFFOZXLge3svlsrRWDJnt_xImYb6cTjks5sYcSVgaJpZM4OukZQ> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r133180495 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { --- End diff -- yes, currently working on: 1. parameterizing `DiskStore` and `DiskStoreSuite` 2. revert the tests in `BlockManagerSuite` 3. revert the 6gb change in sbt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r133144224 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { --- End diff -- @cloud-fan , @vanzin , taking the 'parameterized approach', I'd remove most of the tests from `BlockManagerSuite` as they'd require propagating this parameter to too many subsystems. so, I'm going to modify `DiskStore` and `DiskStoreSuite` to use such a parameter, I'm not sure about leaving a test-case in `BlockManagerSuite` that tests `DISK_ONLY` persistence, what do you guys think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r133141988 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { --- End diff -- @cloud-fan I know, it even gets worse when using the `===` operator. I'm currently exploring the second direction pointed by @vanzin , introducing a test-only configuration key to configure the max page size --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r133135659 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel: StorageLevel) { --- End diff -- @vanzin , I've measured, test cases times range from 7-25 seconds on my laptop. point well taken :sunglasses: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r132978316 --- Diff: project/SparkBuild.scala --- @@ -790,7 +790,7 @@ object TestSettings { javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test += "-ea", -javaOptions in Test ++= "-Xmx3g -Xss4096k" +javaOptions in Test ++= "-Xmx6g -Xss4096k" --- End diff -- @cloud-fan , let's wait few hours and see what the other guys CCed for this (the last ones to edit the build) have to say about this. if they are also worried or do not comment I'll revert this. I must say I'm reluctant to revert these tests as I personally believe that lack of such tests contributed to spark's 2GB issues, including this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @cloud-fan I think I found the sbt setting that controlled max heap size for forked tests, I've increased it from 3g to 6g. cc: @srowen, @vanzin and @a-roberts you guys seem to be the last ones to update this area in sbt, please review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 Agreed, though I can't really understand it... Ran SBT locally with --mem 12000 and provided the magical jvm flag that prints jvm provided CLI args, it seems SBT is indeed running with 12gb and the test is still failing - even when running this test only. Does SBT fork before executing tests? If so how does it configure the jvm running the tests? On Aug 9, 2017 08:29, "Wenchen Fan" <notificati...@github.com> wrote: > since this PR only focus on DiskStore, shall we remove the new tests in > BlockManagerSuite? Seems the OOM only happens in BlockManagerSuite > > â > You are receiving this because you authored the thread. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18855#issuecomment-321156406>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/ABFFOZ8ahiQXd_dce4M6UG-Wib8ebaBTks5sWUOmgaJpZM4OukZQ> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @cloud-fan, any idea how much memory is allocated for running sbt? currently one of my newly introduced tests fails on OOM during kryo serialization... it's actually a bit weird as this is the test that stores it in memory deserialized objects, notice that the test doesn't really generate a true 2GB large dataset as the iterator returns the same array on every `next()` call, so basically the underlying buffer(actually buffers as it used chunked byte buffer) used by kryo is the first materialization of such a large piece of data. sbt execution of this suite seems to stop on the first error so I can't tell if the 'memory deserialized' version of this test would have passed. please advice, is it possible to increase sbt's process heap size? or should I remove/comment/ignore these tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes fails fo...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @cloud-fan , can you please give the 'ok to test'? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131914883 --- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala --- @@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite { assert(diskStore.getSize(blockId) === 0L) } + test("blocks larger than 2gb") { +val conf = new SparkConf() +val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) +val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) + +val mb = 1024 * 1024 +val gb = 1024L * mb + +val blockId = BlockId("rdd_1_2") +diskStore.put(blockId) { chan => + val arr = new Array[Byte](mb) + for { +_ <- 0 until 2048 + } { +val buf = ByteBuffer.wrap(arr) +while (buf.hasRemaining()) { + chan.write(buf) +} + } +} + +val blockData = diskStore.getBytes(blockId) +assert(blockData.size == 2 * gb) --- End diff -- possible, will fix. I guess I aimed for the lowest possible failing value --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131913940 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) { +store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") +def mkBlobs() = { + val rng = new java.util.Random(42) + val buff = new Array[Byte](1024 * 1024) + rng.nextBytes(buff) + Iterator.fill(2 * 1024 + 1) { +buff + } +} +val res1 = store.getOrElseUpdate( + RDDBlockId(42, 0), + storageLevel, + implicitly[ClassTag[Array[Byte]]], + mkBlobs _ +) +withClue(res1) { + assert(res1.isLeft) + assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall { +case (a, b) => + a != null && +b != null && +a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) +} +val getResult = store.get(RDDBlockId(42, 0)) +withClue(getResult) { + assert(getResult.isDefined) + assert(getResult.get.data.zipAll(mkBlobs(), null, null).forall { +case (a, b) => + a != null && +b != null && +a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) +} +val getBlockRes = store.getBlockData(RDDBlockId(42, 0)) +withClue(getBlockRes) { + try { +assert(getBlockRes.size() >= 2 * 1024 * 1024 * 1024) +Utils.tryWithResource(getBlockRes.createInputStream()) { inpStrm => + val iter = store +.serializerManager +.dataDeserializeStream(RDDBlockId(42, 0) + , inpStrm)(implicitly[ClassTag[Array[Byte]]]) + assert(iter.zipAll(mkBlobs(), null, null).forall { +case (a, b) => + a != null && +b != null && +a.asInstanceOf[Array[Byte]].seq == b.asInstanceOf[Array[Byte]].seq + }) +} + } finally { +getBlockRes.release() + } +} + } + + test("getOrElseUpdate > 2gb, storage level = disk only") { --- End diff -- these tests cover more than just the `DiskOnly` storage level, they were crafted when I had bigger ambitions of solving the entire 2GB issue :sunglasses: , that was before seeing some ~100 files pull requests being abandoned or rejected. aside, these tests also test the entire orchestration done by `BlockManager` when an `RDD` requests a cached partition, notice that these tests intentionally makes two calls to the BlockManager in order to simulate both code paths (cache-hit, cache-miss). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131912886 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) { +store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") +def mkBlobs() = { + val rng = new java.util.Random(42) + val buff = new Array[Byte](1024 * 1024) + rng.nextBytes(buff) + Iterator.fill(2 * 1024 + 1) { +buff + } +} +val res1 = store.getOrElseUpdate( + RDDBlockId(42, 0), + storageLevel, + implicitly[ClassTag[Array[Byte]]], + mkBlobs _ +) +withClue(res1) { + assert(res1.isLeft) + assert(res1.left.get.data.zipAll(mkBlobs(), null, null).forall { +case (a, b) => --- End diff -- can't compare Arrays, you get identity equality which is usually not what you want. hence the `.seq` that forces it to be wrapped with a Seq --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131912438 --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala --- @@ -1415,6 +1415,79 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE super.fetchBlockSync(host, port, execId, blockId) } } + + def testGetOrElseUpdateForLargeBlock(storageLevel : StorageLevel) { +store = makeBlockManager(6L * 1024 * 1024 * 1024, "exec1") +def mkBlobs() = { + val rng = new java.util.Random(42) + val buff = new Array[Byte](1024 * 1024) + rng.nextBytes(buff) + Iterator.fill(2 * 1024 + 1) { +buff + } +} +val res1 = store.getOrElseUpdate( + RDDBlockId(42, 0), + storageLevel, + implicitly[ClassTag[Array[Byte]]], + mkBlobs _ +) +withClue(res1) { --- End diff -- I think it'd print an Either where left side is a case class with members: iterator (prints as empty/non empty iterator), an enum and number of bytes. right side is an iterator, again this'd print an empty/not-empty iterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151] [Block Manager] DiskStore.getBytes f...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131758308 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -165,6 +147,62 @@ private[spark] class DiskStore( } +private class DiskBlockData( +conf: SparkConf, +file: File, +blockSize: Long) extends BlockData { + + private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + + override def toInputStream(): InputStream = new FileInputStream(file) + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ + override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { +Utils.tryWithResource(open()) { channel => + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk = allocator(chunkSize.toInt) +remaining -= chunkSize +JavaUtils.readFully(channel, chunk) +chunk.flip() +chunks += chunk + } + new ChunkedByteBuffer(chunks.toArray) +} + } + + override def toByteBuffer(): ByteBuffer = { --- End diff -- @cloud-fan it took me roughly 4 hours, but I looked both at the shuffle cod path and at `BlockManager.getRemoteBytes`: it seems the first is robust to large blocks by using Netty's stream capabilities, the later seems to be broken as it's not using the Netty's streaming capabilities and actually tries to copy the result buffer into a heap based buffer. I think this deserves its own JIRA/PR. I think these two places plus the external shuffle server cover most of the relevant use cases (aside from local caching which i believe this PR completes in terms of being 2GB proof). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131713763 --- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala --- @@ -92,6 +92,31 @@ class DiskStoreSuite extends SparkFunSuite { assert(diskStore.getSize(blockId) === 0L) } + test("blocks larger than 2gb") { +val conf = new SparkConf() +val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) +val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf)) + +val mb = 1024 * 1024 +val gb = 1024L * mb + +val blockId = BlockId("rdd_1_2") +diskStore.put(blockId) { chan => + val arr = new Array[Byte](mb) + for { +_ <- 0 until 2048 + } { +val buf = ByteBuffer.wrap(arr) +while (buf.hasRemaining()) { + chan.write(buf) +} + } +} + +val blockData = diskStore.getBytes(blockId) --- End diff -- @kiszk, this is the test case I was referring to. I actually introduced it prior to actually (hopefully) fixing the bug in `DiskStore.getBytes`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131712834 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -165,6 +147,62 @@ private[spark] class DiskStore( } +private class DiskBlockData( +conf: SparkConf, +file: File, +blockSize: Long) extends BlockData { + + private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + + override def toInputStream(): InputStream = new FileInputStream(file) + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ + override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { +Utils.tryWithResource(open()) { channel => + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk = allocator(chunkSize.toInt) +remaining -= chunkSize +JavaUtils.readFully(channel, chunk) +chunk.flip() +chunks += chunk + } + new ChunkedByteBuffer(chunks.toArray) +} + } + + override def toByteBuffer(): ByteBuffer = { --- End diff -- indeed. I chose to postpone the failure from `DiskStroe.getBytes` to this place as I believe it introduces no regression while still allowing the more common 'streamin' like use-case. further more, I think this plays well with the comment about future deprecation of `org.apache.spark.network.buffer.ManagedBuffer#nioByteBuffer` which seems to be the main reason for `BlockData` exposing the `toByteBuffer` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fails for...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @HyukjinKwon , interesting reading but I couldn't find a concrete reason or solution to the issue. @cloud-fan, I've encountered this bug when working with a disk persisted RDD, it turned out some of our partitions exceeded the 2GB limit and failed on `DiskStore.getBytes`. since we're using a specific commercial distro I couldn't test this patch on my use case (we got rid of the large partitions anyway by tuning number of partitions and tweaking the DAG altogether :sunglasses: ), so I did the next best thing: reproduce the issue in a test case, please see test "blocks larger than 2gb" in `DiskStoreSuite` inthis PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fails for...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @kiszk , fixed styling+readability according to your comments. BTW, any idea why JIRA didn't associate this PR with SPARK-3151? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131543591 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -165,6 +147,62 @@ private[spark] class DiskStore( } +private class DiskBlockData( +conf: SparkConf, +file: File, +blockSize: Long) extends BlockData { + + private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + + override def toInputStream(): InputStream = new FileInputStream(file) + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ + override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { +Utils.tryWithResource(open()) { channel => + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk = allocator(chunkSize.toInt) +remaining -= chunkSize +JavaUtils.readFully(channel, chunk) +chunk.flip() +chunks += chunk + } + new ChunkedByteBuffer(chunks.toArray) +} + } + + override def toByteBuffer(): ByteBuffer = { +require( size < Int.MaxValue --- End diff -- @kiszk , not sure I'm following your comment. this requirement results with an explicit errors when one tries to obtain a ByteBuffer larger than java.nio's limitations. original code used to fail in line 115 when calling `ByteBuffer.allocate` with block size larger than 2GB, newer code fails explicitly in this cae. ...or do you mean refer the `blockSize` val rather than the `size` method? can't really see a difference in that case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [SPARK-3151][Block Manager] DiskStore.getBytes fa...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/18855#discussion_r131543602 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -165,6 +147,62 @@ private[spark] class DiskStore( } +private class DiskBlockData( +conf: SparkConf, +file: File, +blockSize: Long) extends BlockData { + + private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + + override def toInputStream(): InputStream = new FileInputStream(file) + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNetty()` for more details. + */ + override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { +Utils.tryWithResource(open()) { channel => + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk = allocator(chunkSize.toInt) +remaining -= chunkSize +JavaUtils.readFully(channel, chunk) +chunk.flip() +chunks += chunk + } + new ChunkedByteBuffer(chunks.toArray) +} + } + + override def toByteBuffer(): ByteBuffer = { +require( size < Int.MaxValue + , s"can't create a byte buffer of size $blockSize" ++ s" since it exceeds Int.MaxValue ${Int.MaxValue}.") +Utils.tryWithResource(open()) { channel => + if (blockSize < minMemoryMapBytes) { +// For small files, directly read rather than memory map. +val buf = ByteBuffer.allocate(blockSize.toInt) +JavaUtils.readFully(channel, buf) +buf.flip() +buf + } else { +channel.map(MapMode.READ_ONLY, 0, file.length) + } +} + } + + override def size: Long = blockSize + + override def dispose(): Unit = {} + +private def open() = new FileInputStream(file).getChannel --- End diff -- will do :sunglasses: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18855: [Spark 3151][Block Manager] DiskStore.getBytes fails for...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/18855 @rxin, @JoshRosen , @cloud-fan , you seem to be the last guys to touch this class, can you please review? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18855: [Spark 3151][Block Manager] DiskStore.getBytes fa...
GitHub user eyalfa opened a pull request: https://github.com/apache/spark/pull/18855 [Spark 3151][Block Manager] DiskStore.getBytes fails for files larger than 2GB ## What changes were proposed in this pull request? introduced `DiskBlockData`, a new implementation of `BlockData` representing a whole file. this is somehow related to [SPARK-6236](https://issues.apache.org/jira/browse/SPARK-6236) as well This class follows the implementation of `EncryptedBlockData` just without the encryption. hence: * it uses FileOutputStream (todo: encrypted version actually uses `Channels.newInputStream`, not sure if it's the right choice for this) * `toNetty` is implemented in terms of `io.netty.channel.DefaultFileRegion#DefaultFileRegion` * `toByteBuffer` fails for files larger than 2GB (same behavior of the original code, just postponed a bit), it also respects the same configuration keys defined by the original code to choose between memory mapping and simple file read. (Please fill in changes proposed in this fix) ## How was this patch tested? added test to DiskStoreSuite and MemoryManagerSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/eyalfa/spark SPARK-3151 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18855 commit fc3f1d78e14a30dd2f71fc65ec59a2def5c1a0d4 Author: Eyal Farago <e...@nrgene.com> Date: 2017-07-05T13:20:16Z SPARK-6235__take1: introduce a failing test. commit 84687380026a6a3bcded27be517094d3f690c3bb Author: Eyal Farago <e...@nrgene.com> Date: 2017-07-30T20:06:05Z SPARK-6235__add_failing_tests: add failing tests for block manager suite. commit 15804497a477b8f97c08adfad5f0519504dc82f2 Author: Eyal Farago <e...@nrgene.com> Date: 2017-08-01T17:34:26Z SPARK-6235__add_failing_tests: introduce a new BlockData implementation to represent a disk backed block data. commit c5028f50698c4fe48a06f5dd683dbee42f7e6b2b Author: Eyal Farago <e...@nrgene.com> Date: 2017-08-05T19:57:41Z SPARK-6235__add_failing_tests: styling commit 908c7860688534d0bb77bcbebbd2e006a161fb74 Author: Eyal Farago <e...@nrgene.com> Date: 2017-08-05T19:58:52Z SPARK-6235__add_failing_tests: adapt DiskStoreSuite to the modifications in the tested class. commit 67f4259ca16c3ca7c904c9ccc5de9acbc25d2271 Author: Eyal Farago <e...@nrgene.com> Date: 2017-08-05T20:57:58Z SPARK-6235__add_failing_tests: try to reduce actual memory footprint of the >2gb tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r99475332 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +/** +* SPARK-18601 discusses simplification direct access to complex types creators. +* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be simplified to {{{`x` * `x`}}}. +* sam applies to create_array and create_map +*/ +class ComplexTypesSuite extends PlanTest{ + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("collapse projections", FixedPoint(10), + CollapseProject) :: + Batch("Constant Folding", FixedPoint(10), + NullPropagation(conf), + ConstantFolding, + BooleanSimplification, + SimplifyConditionals, + SimplifyBinaryComparison, + SimplifyCreateStructOps, + SimplifyCreateArrayOps, + SimplifyCreateMapOps) :: Nil + } + + val idAtt = ('id).long.notNull + + lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil) + + val idRef = baseOptimizedPlan.output.head + + implicit class ComplexTypeDslSupport(e : Expression) { +def getStructField(f : String): GetStructField = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[StructType]) + val structType = e.dataType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetStructField(e, ord, Some(f)) +} + +def getArrayStructField(f : String) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + val arrType = e.dataType.asInstanceOf[ArrayType] + assert(arrType.elementType.isInstanceOf[StructType]) + val structType = arrType.elementType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetArrayStructFields(e, structType(ord), ord, 1, arrType.containsNull) +} + +def getArrayItem(i : Int) : GetArrayItem = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + GetArrayItem(e, Literal(i)) +} + +def getMapValue(k : Expression) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[MapType]) + val mapType = e.dataType.asInstanceOf[MapType] + assert(k.dataType == mapType.keyType) + GetMapValue(e, k) +} + +def addCaseWhen( cond : Expression, v : Expression) : Expression = { + assert(e.resolved) + assert(e.isInstanceOf[CaseWhen]) + assert(cond.dataType == BooleanType) + assert(v.dataType == e.dataType) + val CaseWhen(branches, default) = e + CaseWhen( branches :+ (cond, v), default) +} + } + + test("explicit") { +val rel = baseOptimizedPlan.select( + CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as "outerAtt" + ) + +assertResult(StructType(StructField("outerAtt", LongType, nullable = false) :: Nil))(rel.schema) + +val optimized = Optimize execute rel + +
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r99475106 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +/** +* SPARK-18601 discusses simplification direct access to complex types creators. +* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be simplified to {{{`x` * `x`}}}. +* sam applies to create_array and create_map +*/ +class ComplexTypesSuite extends PlanTest{ + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("collapse projections", FixedPoint(10), + CollapseProject) :: + Batch("Constant Folding", FixedPoint(10), + NullPropagation(conf), + ConstantFolding, + BooleanSimplification, + SimplifyConditionals, + SimplifyBinaryComparison, + SimplifyCreateStructOps, + SimplifyCreateArrayOps, + SimplifyCreateMapOps) :: Nil + } + + val idAtt = ('id).long.notNull + + lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil) + + val idRef = baseOptimizedPlan.output.head + + implicit class ComplexTypeDslSupport(e : Expression) { +def getStructField(f : String): GetStructField = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[StructType]) + val structType = e.dataType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetStructField(e, ord, Some(f)) +} + +def getArrayStructField(f : String) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + val arrType = e.dataType.asInstanceOf[ArrayType] + assert(arrType.elementType.isInstanceOf[StructType]) + val structType = arrType.elementType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetArrayStructFields(e, structType(ord), ord, 1, arrType.containsNull) +} + +def getArrayItem(i : Int) : GetArrayItem = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + GetArrayItem(e, Literal(i)) +} + +def getMapValue(k : Expression) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[MapType]) + val mapType = e.dataType.asInstanceOf[MapType] + assert(k.dataType == mapType.keyType) + GetMapValue(e, k) +} + +def addCaseWhen( cond : Expression, v : Expression) : Expression = { + assert(e.resolved) + assert(e.isInstanceOf[CaseWhen]) + assert(cond.dataType == BooleanType) + assert(v.dataType == e.dataType) + val CaseWhen(branches, default) = e + CaseWhen( branches :+ (cond, v), default) +} + } + + test("explicit") { +val rel = baseOptimizedPlan.select( + CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as "outerAtt" + ) + +assertResult(StructType(StructField("outerAtt", LongType, nullable = false) :: Nil))(rel.schema) + +val optimized = Optimize execute rel + +
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r99475096 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +/** +* SPARK-18601 discusses simplification direct access to complex types creators. +* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be simplified to {{{`x` * `x`}}}. +* sam applies to create_array and create_map +*/ +class ComplexTypesSuite extends PlanTest{ + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("collapse projections", FixedPoint(10), + CollapseProject) :: + Batch("Constant Folding", FixedPoint(10), + NullPropagation(conf), + ConstantFolding, + BooleanSimplification, + SimplifyConditionals, + SimplifyBinaryComparison, + SimplifyCreateStructOps, + SimplifyCreateArrayOps, + SimplifyCreateMapOps) :: Nil + } + + val idAtt = ('id).long.notNull + + lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil) + + val idRef = baseOptimizedPlan.output.head + + implicit class ComplexTypeDslSupport(e : Expression) { +def getStructField(f : String): GetStructField = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[StructType]) + val structType = e.dataType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetStructField(e, ord, Some(f)) +} + +def getArrayStructField(f : String) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + val arrType = e.dataType.asInstanceOf[ArrayType] + assert(arrType.elementType.isInstanceOf[StructType]) + val structType = arrType.elementType.asInstanceOf[StructType] + val ord = structType.fieldNames.indexOf(f) + assert(-1 != ord) + GetArrayStructFields(e, structType(ord), ord, 1, arrType.containsNull) +} + +def getArrayItem(i : Int) : GetArrayItem = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[ArrayType]) + GetArrayItem(e, Literal(i)) +} + +def getMapValue(k : Expression) : Expression = { + assert(e.resolved) + assert(e.dataType.isInstanceOf[MapType]) + val mapType = e.dataType.asInstanceOf[MapType] + assert(k.dataType == mapType.keyType) + GetMapValue(e, k) +} + +def addCaseWhen( cond : Expression, v : Expression) : Expression = { + assert(e.resolved) + assert(e.isInstanceOf[CaseWhen]) + assert(cond.dataType == BooleanType) + assert(v.dataType == e.dataType) + val CaseWhen(branches, default) = e + CaseWhen( branches :+ (cond, v), default) +} + } + + test("explicit") { +val rel = baseOptimizedPlan.select( + CreateNamedStruct("att" :: idRef :: Nil).getStructField("att") as "outerAtt" + ) + +assertResult(StructType(StructField("outerAtt", LongType, nullable = false) :: Nil))(rel.schema) + +val optimized = Optimize execute rel + +
[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/16043 @HyukjinKwon, @hvanhovell, are you familiar with this build failure? seems to be unrelated to my specific build... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98660310 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +// instead f selecting the field on the entire array, +// select it from each member of the array. +// pushing down the operation this way open other optimizations opportunities +// (i.e. struct(...,x,...).x) +CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +// instead of creating the array and then selecting one row, +// remove array creation altgether. +if (idx >= 0 && idx < elems.size) { + // valid index + elems(idx) +} else { + // out of bounds, mimic the runtime behavior and return null + Cast(Literal(null), ga.dataType) --- End diff -- yep --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98660085 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -293,6 +293,12 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // from that. Note that CaseWhen.branches should never be empty, and as a result the // headOption (rather than head) added above is just an extra (and unnecessary) safeguard. branches.head._2 + + case e @ CaseWhen(branches, _) if branches.exists(_._1 == Literal(true)) => +// a branc with a TRue condition eliminates all following branches, +// these branches can be pruned away +val (h, t) = branches.span(_._1 != Literal(true)) +CaseWhen( h :+ t.head, None) --- End diff -- sorry, please explain --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/16043 @hvanhovell can you figure out what fail the build? seems all tests passed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16043: [SPARK-18601][SQL] Simplify Create/Get complex expressio...
Github user eyalfa commented on the issue: https://github.com/apache/spark/pull/16043 @hvanhovel, currently in the process of refactoring,luckily I've introduced some tests in the initial sprint so I've cought my mistakes and refreshed my mind around the assumptions this code makes. Regarding nulls,I'm using coalesce to add the first positive match after undetermined ones, when one of the undetermined values is null,coalesce will override it with the positive match's value. This was handled in the normalize method of class ClassifiefKeys,part of the refactoring involves removing this class and simplifying the decision making around matches. Will push later today after running scalastyle on my branch. On Jan 27, 2017 18:40, "Herman van Hovell" <notificati...@github.com> wrote: > *@hvanhovell* commented on this pull request. > -- > > In sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ > optimizer/ComplexTypes.scala <https://github.com/apache/spark/pull/16043>: > > > +} > +} > +val res = res1.normalize( requestedKey ) > +res > + } > + > + override def apply(plan: LogicalPlan): LogicalPlan = { > +plan.transformExpressionsUp { > + // attempt to unfold 'constant' key extraction, > + // this enables other optimizations to take place. > + case gmv @ GetMapValue(cm @ CreateMap(elems), key) => > +val kvs = cm.keys.zip(cm.values) > +val classifiedEntries = classifyEntries(kvs, key) > +classifiedEntries match { > + case ClassifiedEntries(Seq(), _, None) => Literal.create(null, gmv.dataType) > + case ClassifiedEntries(`elems`, _, None) => gmv > > It is fine, it just needs a little clarification. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/16043>, or mute the thread > <https://github.com/notifications/unsubscribe-auth/ABFFOV2D--1Wea5bmhkZVNwjmAZXxrtLks5rWh3lgaJpZM4K-Obo> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98222460 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +if (idx >= 0 && idx < elems.size) { + elems(idx) +} else { + Cast(Literal(null), ga.dataType) +} +} + } +} + +/** +* push down operations into [[CreateMap]]. +*/ +object SimplifyCreateMapOps extends Rule[LogicalPlan] { + object ComparisonResult extends Enumeration { +val PositiveMatch = Value +val NegativeMatch = Value +val UnDetermined = Value + } + + def compareKeys(k1 : Expression, k2 : Expression) : ComparisonResult.Value = { +(k1, k2) match { + case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch + // make surethis is null safe, especially when datatypes differ + // is this even possible? + case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch + case _ => ComparisonResult.UnDetermined +} + } + + case class ClassifiedEntries( +undetermined : Seq[Expression], +nullable : Boolean, +firstPositive : Option[Expression]) { +def normalize(k : Expression) : ClassifiedEntries = this match { + /** + * when we have undetermined matches that might bproduce a null value, + * we can't separate a positive match and use [[Coalesce]] to choose the final result. + * so we 'hide' the positive match as an undetermined match. + */ + case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty => +ClassifiedEntries(u ++ Seq(k, p), true, None) + case _ => this +} + } + + def classifyEntries(mapEntries : Seq[(Expression, Expression)], + requestedKey : Expression) : ClassifiedEntries = { +val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = false, None)) { + case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev + case (ClassifiedEntries(prev, nullable, None), (k, v)) => +compareKeys(k, requestedKey) match { + case ComparisonResult.UnDetermined => +val vIsNullable = v.nullable +val nextNullbale = nullable || vIsNullable +ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, None) + case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, nullable, None) + case ComparisonResult.PositiveMatch => Classified
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98216632 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +if (idx >= 0 && idx < elems.size) { + elems(idx) +} else { + Cast(Literal(null), ga.dataType) +} +} + } +} + +/** +* push down operations into [[CreateMap]]. +*/ +object SimplifyCreateMapOps extends Rule[LogicalPlan] { + object ComparisonResult extends Enumeration { +val PositiveMatch = Value +val NegativeMatch = Value +val UnDetermined = Value + } + + def compareKeys(k1 : Expression, k2 : Expression) : ComparisonResult.Value = { +(k1, k2) match { + case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch + // make surethis is null safe, especially when datatypes differ + // is this even possible? + case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch + case _ => ComparisonResult.UnDetermined +} + } + + case class ClassifiedEntries( +undetermined : Seq[Expression], +nullable : Boolean, +firstPositive : Option[Expression]) { +def normalize(k : Expression) : ClassifiedEntries = this match { + /** + * when we have undetermined matches that might bproduce a null value, + * we can't separate a positive match and use [[Coalesce]] to choose the final result. + * so we 'hide' the positive match as an undetermined match. + */ + case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty => +ClassifiedEntries(u ++ Seq(k, p), true, None) + case _ => this +} + } + + def classifyEntries(mapEntries : Seq[(Expression, Expression)], + requestedKey : Expression) : ClassifiedEntries = { +val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = false, None)) { + case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev + case (ClassifiedEntries(prev, nullable, None), (k, v)) => +compareKeys(k, requestedKey) match { + case ComparisonResult.UnDetermined => +val vIsNullable = v.nullable +val nextNullbale = nullable || vIsNullable +ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, None) + case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, nullable, None) + case ComparisonResult.PositiveMatch => Classified
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98193617 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike : CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +if (idx >= 0 && idx < elems.size) { + elems(idx) +} else { + Cast(Literal(null), ga.dataType) +} +} + } +} + +/** +* push down operations into [[CreateMap]]. +*/ +object SimplifyCreateMapOps extends Rule[LogicalPlan] { + object ComparisonResult extends Enumeration { +val PositiveMatch = Value +val NegativeMatch = Value +val UnDetermined = Value + } + + def compareKeys(k1 : Expression, k2 : Expression) : ComparisonResult.Value = { +(k1, k2) match { + case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch + // make surethis is null safe, especially when datatypes differ --- End diff -- I this my comment relates to the case where one literal in an integer while the other is a long, not sue if this is possible (as stated in the next comment), but if it is, does null : Int equals null : Long? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98192298 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala --- @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Coalesce, CreateArray, CreateMap, CreateNamedStruct, Expression, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, Literal} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Range +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +/** +* Created by eyalf on 11/4/2016. +* SPARK-18601 discusses simplification direct access to complex types creators. +* i.e. {{{create_named_struct(square, `x` * `x`).square}}} can be simplified to {{{`x` * `x`}}}. +* sam applies to create_array and create_map +*/ +class ComplexTypesSuite extends PlanTest with Matchers{ + + object Optimize extends RuleExecutor[LogicalPlan] { +val batches = + Batch("collapse projections", FixedPoint(10), + CollapseProject) :: + Batch("Constant Folding", FixedPoint(10), + NullPropagation, + ConstantFolding, + BooleanSimplification, + SimplifyConditionals, + SimplifyCreateStructOps, + SimplifyCreateArrayOps, + SimplifyCreateMapOps) :: Nil + } + + val idAtt = ('id).long.notNull + + lazy val baseOptimizedPlan = Range(1L, 1000L, 1, Some(2), idAtt :: Nil) + + val idRef = baseOptimizedPlan.output.head + + +// val idRefColumn = Column("id") +// val struct1RefColumn = Column("struct1") + + implicit class ComplexTypeDslSupport(e : Expression) { +def getStructField(f : String): GetStructField = { + e should be ('resolved) + e.dataType should be (a[StructType]) --- End diff -- will change to assertion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98192112 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Cast, Coalesce, CreateArray, CreateMap, CreateNamedStructLike, Expression, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan]{ + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp{ + // push down field extraction + case GetStructField(createNamedStructLike : CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan]{ + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp{ + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +def getStructField(elem : Expression) = { + GetStructField(elem, ordinal, Some(field.name)) +} +CreateArray(elems.map(getStructField)) + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +if (idx >= 0 && idx < elems.size) { + elems(idx) +} else { + Cast(Literal(null), ga.dataType) +} +} + } +} + +/** +* push down operations into [[CreateMap]]. +*/ +object SimplifyCreateMapOps extends Rule[LogicalPlan]{ + object ComparisonResult extends Enumeration { +val PositiveMatch = Value +val NegativeMatch = Value +val UnDetermined = Value + } + + def compareKeys(k1 : Expression, k2 : Expression) : ComparisonResult.Value = { +(k1, k2) match { + case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch + // make surethis is null safe, especially when datatypes differ + // is this even possible? + case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch + case _ => ComparisonResult.UnDetermined +} + } + + case class ClassifiedEntries(undetermined : Seq[Expression], + nullable : Boolean, + firstPositive : Option[Expression]) { +def normalize( k : Expression ) : ClassifiedEntries = this match { + /** + * when we have undetermined matches that might bproduce a null value, + * we can't separate a positive match and use [[Coalesce]] to choose the final result. + * so we 'hide' the positive match as an undetermined match. + */ + case ClassifiedEntries( u, true, Some(p)) if u.nonEmpty => +ClassifiedEntries(u ++ Seq(k, p), true, None) + case _ => this +} + } + + def classifyEntries(mapEntries : Seq[(Expression, Expression)], + requestedKey : Expression) : ClassifiedEntries = { +val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = false, None)) { + case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev + case (ClassifiedEntries(prev, nullable, None), (k, v)) => +compareKeys(k, requestedKey) match { + case ComparisonResult.UnDetermined => +val vIsNullable = v.n
[GitHub] spark pull request #16043: [SPARK-18601][SQL] Simplify Create/Get complex ex...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/16043#discussion_r98191785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ComplexTypes.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +/** +* push down operations into [[CreateNamedStructLike]]. +*/ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => +createNamedStructLike.valExprs(ordinal) +} + } +} + +/** +* push down operations into [[CreateArray]]. +*/ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { +plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => +CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => +if (idx >= 0 && idx < elems.size) { + elems(idx) +} else { + Cast(Literal(null), ga.dataType) +} +} + } +} + +/** +* push down operations into [[CreateMap]]. +*/ +object SimplifyCreateMapOps extends Rule[LogicalPlan] { + object ComparisonResult extends Enumeration { +val PositiveMatch = Value +val NegativeMatch = Value +val UnDetermined = Value + } + + def compareKeys(k1 : Expression, k2 : Expression) : ComparisonResult.Value = { +(k1, k2) match { + case (x, y) if x.semanticEquals(y) => ComparisonResult.PositiveMatch + // make surethis is null safe, especially when datatypes differ + // is this even possible? + case (_ : Literal, _ : Literal) => ComparisonResult.NegativeMatch + case _ => ComparisonResult.UnDetermined +} + } + + case class ClassifiedEntries( +undetermined : Seq[Expression], +nullable : Boolean, +firstPositive : Option[Expression]) { +def normalize(k : Expression) : ClassifiedEntries = this match { + /** + * when we have undetermined matches that might bproduce a null value, + * we can't separate a positive match and use [[Coalesce]] to choose the final result. + * so we 'hide' the positive match as an undetermined match. + */ + case ClassifiedEntries(u, true, Some(p)) if u.nonEmpty => +ClassifiedEntries(u ++ Seq(k, p), true, None) + case _ => this +} + } + + def classifyEntries(mapEntries : Seq[(Expression, Expression)], + requestedKey : Expression) : ClassifiedEntries = { +val res1 = mapEntries.foldLeft(ClassifiedEntries(Seq.empty, nullable = false, None)) { + case (prev @ ClassifiedEntries(_, _, Some(_)), _) => prev + case (ClassifiedEntries(prev, nullable, None), (k, v)) => +compareKeys(k, requestedKey) match { + case ComparisonResult.UnDetermined => +val vIsNullable = v.nullable +val nextNullbale = nullable || vIsNullable +ClassifiedEntries(prev ++ Seq(k, v), nullable = nextNullbale, None) + case ComparisonResult.NegativeMatch => ClassifiedEntries(prev, nullable, None) + case ComparisonResult.PositiveMatch => Classified