[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218743585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { +val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) +val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute + SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ => + superGetOrCompute(split, context) +).getOrElse { + val batchIter = superGetOrCompute(split, context) + if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) { +val cachedBatch = batchIter.next() --- End diff -- assert `!batchIter.hasNext`, you expect this partition to contain a single batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218716111 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { --- End diff -- can this be avoided by maintaining two (zipped) RDDs? one of `CachedBatch`s and the other holding only the stats? can this approach avoid the need for a specialized block type for managing metadata? correct me if i'm wrong, but first time the stats are accessed, your approach performs a full scan to extract the stats (happens in the sql code), so having a second RDD which is something like : `batches.map(_.stats).persist` should give the same behavior, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218714493 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel} + +private[columnar] class CachedColumnarRDD( +@transient private var _sc: SparkContext, +private var dataRDD: RDD[CachedBatch], +private[columnar] val containsPartitionMetadata: Boolean, +expectedStorageLevel: StorageLevel) + extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) { + + override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = { +firstParent.iterator(split, context) + } + + override def unpersist(blocking: Boolean = true): this.type = { +CachedColumnarRDD.allMetadataFetched.remove(id) +CachedColumnarRDD.rddIdToMetadata.remove(id) +super.unpersist(blocking) + } + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + override private[spark] def getOrCompute(split: Partition, context: TaskContext): + Iterator[CachedBatch] = { +val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index) +val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute + SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ => + superGetOrCompute(split, context) +).getOrElse { + val batchIter = superGetOrCompute(split, context) + if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) { +val cachedBatch = batchIter.next() +SparkEnv.get.blockManager.putSingle(metadataBlockId, cachedBatch.stats, + expectedStorageLevel) +new InterruptibleIterator[CachedBatch](context, Iterator(cachedBatch)) + } else { +batchIter + } +} + } +} + +private[columnar] object CachedColumnarRDD { + + private val rddIdToMetadata = new ConcurrentHashMap[Int, mutable.ArraySeq[Option[InternalRow]]]() --- End diff -- could these be moved to become a member of the RDD class? seems like a map of this->some.property, in this case can be made an instance member. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218705219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( --- End diff -- can this be pulled up out of `usePartitionLevelMetadata` ? seems like you're constructing the predicate per record --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r218704390 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -52,6 +52,68 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +private[columnar] class CachedBatchIterator( +rowIterator: Iterator[InternalRow], +output: Seq[Attribute], +batchSize: Int, +useCompression: Boolean, +batchStats: LongAccumulator, +singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] { + + def next(): CachedBatch = { +val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) +}.toArray + +var rowCount = 0 +var totalSize = 0L + +val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow], + rowCount: Int, size: Long) => { + if (!singleBatch) { +rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE + } else { +rowIter.hasNext --- End diff -- doesn't this run the risk of OOM for large partitions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r155392065 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + partitionStatsSchema) +partitionFilter.initialize(partitionIndex) +if (!partitionFilter.eval(cachedBatch.stats)) { --- End diff -- @sadikovi this while loop is building CatchedBatch, it just decides what's the right time to seal the building window of a CatchedBatch and start the next oneso, in any way, you need to go through every all records in the partition, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user sadikovi commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r154848470 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + partitionStatsSchema) +partitionFilter.initialize(partitionIndex) +if (!partitionFilter.eval(cachedBatch.stats)) { --- End diff -- All good, no need to change. I was trying to understand the code, so my question would be referring to statistics collection overall, not changes in this PR. Link points to a condition (exists before this PR) that could potentially result in exiting iterator before exhausting all records in it, so statistics would be partially collected, which might affect any filtering that uses such statistics - though it is quite possibly handled later, or a theoretical use case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r154845669 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + partitionStatsSchema) +partitionFilter.initialize(partitionIndex) +if (!partitionFilter.eval(cachedBatch.stats)) { --- End diff -- I might not understand your proposal well...are you trying to simplify the logic in https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74? it would make the code simpler but degrade pruning effect here, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r154844579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -52,6 +52,68 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +private[columnar] class CachedBatchIterator( +rowIterator: Iterator[InternalRow], +output: Seq[Attribute], +batchSize: Int, +useCompression: Boolean, +batchStats: LongAccumulator, +singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] { + + def next(): CachedBatch = { +val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) +}.toArray + +var rowCount = 0 +var totalSize = 0L + +val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow], --- End diff -- to make getting partition stats easier, we construct only one CatchedBatch for each partition when enabling the functionality proposed in this PR. `singleBatch` distinguishes the scenarios which enables/disables the functionality by introducing different while loop termination conditions, making the other code reusable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user sadikovi commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r154815012 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -193,38 +195,68 @@ case class InMemoryTableScanExec( private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + private def doFilterCachedBatches( + rdd: RDD[CachedBatch], + partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = { +val schemaIndex = partitionStatsSchema.zipWithIndex +rdd.mapPartitionsWithIndex { + case (partitionIndex, cachedBatches) => +if (inMemoryPartitionPruningEnabled) { + cachedBatches.filter { cachedBatch => +val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + partitionStatsSchema) +partitionFilter.initialize(partitionIndex) +if (!partitionFilter.eval(cachedBatch.stats)) { --- End diff -- Are there any issues with discarding a partition based on statistics that could be partially computed (e.g. when total size in bytes of a partition iterator is larger than configurable batch size) as per https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74? Would be it be beneficial to record such situation by logging it, and still include such partition when statistics are partially computed and filters are evaluated to false, or discard all statistics when some of the partitions hit this situation? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...
Github user sadikovi commented on a diff in the pull request: https://github.com/apache/spark/pull/19810#discussion_r154814573 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -52,6 +52,68 @@ object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) +private[columnar] class CachedBatchIterator( +rowIterator: Iterator[InternalRow], +output: Seq[Attribute], +batchSize: Int, +useCompression: Boolean, +batchStats: LongAccumulator, +singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] { + + def next(): CachedBatch = { +val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) +}.toArray + +var rowCount = 0 +var totalSize = 0L + +val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow], --- End diff -- @CodingCat Could you explain me what `singleBatch` means here? I cannot get my head around it:) Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org