[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218743585
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
+val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
+val superGetOrCompute: (Partition, TaskContext) => 
Iterator[CachedBatch] = super.getOrCompute
+
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
+  superGetOrCompute(split, context)
+).getOrElse {
+  val batchIter = superGetOrCompute(split, context)
+  if (containsPartitionMetadata && getStorageLevel != 
StorageLevel.NONE && batchIter.hasNext) {
+val cachedBatch = batchIter.next()
--- End diff --

assert `!batchIter.hasNext`, you expect this partition to contain a single 
batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218716111
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
--- End diff --

can this be avoided by maintaining two (zipped) RDDs? one of `CachedBatch`s 
and the other holding only the stats?
can this approach avoid the need for a specialized block type for managing 
metadata? 
correct me if i'm wrong, but first time the stats are accessed, your 
approach performs a full scan to extract the stats (happens in the sql code), 
so having a second RDD which is something like : `batches.map(_.stats).persist` 
should give the same behavior, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218714493
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/CachedColumnarRDD.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}
+
+private[columnar] class CachedColumnarRDD(
+@transient private var _sc: SparkContext,
+private var dataRDD: RDD[CachedBatch],
+private[columnar] val containsPartitionMetadata: Boolean,
+expectedStorageLevel: StorageLevel)
+  extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[CachedBatch] = {
+firstParent.iterator(split, context)
+  }
+
+  override def unpersist(blocking: Boolean = true): this.type = {
+CachedColumnarRDD.allMetadataFetched.remove(id)
+CachedColumnarRDD.rddIdToMetadata.remove(id)
+super.unpersist(blocking)
+  }
+
+  override protected def getPartitions: Array[Partition] = 
dataRDD.partitions
+
+  override private[spark] def getOrCompute(split: Partition, context: 
TaskContext):
+  Iterator[CachedBatch] = {
+val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
+val superGetOrCompute: (Partition, TaskContext) => 
Iterator[CachedBatch] = super.getOrCompute
+
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
+  superGetOrCompute(split, context)
+).getOrElse {
+  val batchIter = superGetOrCompute(split, context)
+  if (containsPartitionMetadata && getStorageLevel != 
StorageLevel.NONE && batchIter.hasNext) {
+val cachedBatch = batchIter.next()
+SparkEnv.get.blockManager.putSingle(metadataBlockId, 
cachedBatch.stats,
+  expectedStorageLevel)
+new InterruptibleIterator[CachedBatch](context, 
Iterator(cachedBatch))
+  } else {
+batchIter
+  }
+}
+  }
+}
+
+private[columnar] object CachedColumnarRDD {
+
+  private val rddIdToMetadata = new ConcurrentHashMap[Int, 
mutable.ArraySeq[Option[InternalRow]]]()
--- End diff --

could these be moved to become a member of the RDD class? seems like a map 
of this->some.property, in this case can be made an instance member.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218705219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
--- End diff --

can this be pulled up out of `usePartitionLevelMetadata` ? seems like 
you're constructing the predicate per record


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2018-09-19 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r218704390
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -52,6 +52,68 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
+private[columnar] class CachedBatchIterator(
+rowIterator: Iterator[InternalRow],
+output: Seq[Attribute],
+batchSize: Int,
+useCompression: Boolean,
+batchStats: LongAccumulator,
+singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {
+
+  def next(): CachedBatch = {
+val columnBuilders = output.map { attribute =>
+  ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
+}.toArray
+
+var rowCount = 0
+var totalSize = 0L
+
+val terminateLoop = (singleBatch: Boolean, rowIter: 
Iterator[InternalRow],
+  rowCount: Int, size: Long) => {
+  if (!singleBatch) {
+rowIter.hasNext && rowCount < batchSize && totalSize < 
ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
+  } else {
+rowIter.hasNext
--- End diff --

doesn't this run the risk of OOM for large partitions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-06 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r155392065
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
+  partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+  partitionStatsSchema)
+partitionFilter.initialize(partitionIndex)
+if (!partitionFilter.eval(cachedBatch.stats)) {
--- End diff --

@sadikovi this while loop is building CatchedBatch, it just decides what's 
the right time to seal the building window of a CatchedBatch and start the next 
oneso, in any way, you need to go through every all records in the 
partition, 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-04 Thread sadikovi
Github user sadikovi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r154848470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
+  partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+  partitionStatsSchema)
+partitionFilter.initialize(partitionIndex)
+if (!partitionFilter.eval(cachedBatch.stats)) {
--- End diff --

All good, no need to change. I was trying to understand the code, so my 
question would be referring to statistics collection overall, not changes in 
this PR. Link points to a condition (exists before this PR) that could 
potentially result in exiting iterator before exhausting all records in it, so 
statistics would be partially collected, which might affect any filtering that 
uses such statistics - though it is quite possibly handled later, or a 
theoretical use case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-04 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r154845669
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
+  partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+  partitionStatsSchema)
+partitionFilter.initialize(partitionIndex)
+if (!partitionFilter.eval(cachedBatch.stats)) {
--- End diff --

I might not understand your proposal well...are you trying to simplify the 
logic in 
https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74?
 it would make the code simpler but degrade pruning effect here, 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-04 Thread CodingCat
Github user CodingCat commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r154844579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -52,6 +52,68 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
+private[columnar] class CachedBatchIterator(
+rowIterator: Iterator[InternalRow],
+output: Seq[Attribute],
+batchSize: Int,
+useCompression: Boolean,
+batchStats: LongAccumulator,
+singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {
+
+  def next(): CachedBatch = {
+val columnBuilders = output.map { attribute =>
+  ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
+}.toArray
+
+var rowCount = 0
+var totalSize = 0L
+
+val terminateLoop = (singleBatch: Boolean, rowIter: 
Iterator[InternalRow],
--- End diff --

to make getting partition stats easier, we construct only one CatchedBatch 
for each partition when enabling the functionality proposed in this PR. 
`singleBatch` distinguishes the scenarios which enables/disables the 
functionality by introducing different while loop termination conditions, 
making the other code reusable


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-04 Thread sadikovi
Github user sadikovi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r154815012
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
 
   private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
 
+  private def doFilterCachedBatches(
+  rdd: RDD[CachedBatch],
+  partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
+val schemaIndex = partitionStatsSchema.zipWithIndex
+rdd.mapPartitionsWithIndex {
+  case (partitionIndex, cachedBatches) =>
+if (inMemoryPartitionPruningEnabled) {
+  cachedBatches.filter { cachedBatch =>
+val partitionFilter = newPredicate(
+  partitionFilters.reduceOption(And).getOrElse(Literal(true)),
+  partitionStatsSchema)
+partitionFilter.initialize(partitionIndex)
+if (!partitionFilter.eval(cachedBatch.stats)) {
--- End diff --

Are there any issues with discarding a partition based on statistics that 
could be partially computed (e.g. when total size in bytes of a partition 
iterator is larger than configurable batch size) as per 
https://github.com/apache/spark/pull/19810/files#diff-5fc188468d3066580ea9a766114b8f1dR74?
 

Would be it be beneficial to record such situation by logging it, and still 
include such partition when statistics are partially computed and filters are 
evaluated to false, or discard all statistics when some of the partitions hit 
this situation? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: [SPARK-22599][SQL] In-Memory Table Pruning withou...

2017-12-04 Thread sadikovi
Github user sadikovi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19810#discussion_r154814573
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -52,6 +52,68 @@ object InMemoryRelation {
 private[columnar]
 case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
 
+private[columnar] class CachedBatchIterator(
+rowIterator: Iterator[InternalRow],
+output: Seq[Attribute],
+batchSize: Int,
+useCompression: Boolean,
+batchStats: LongAccumulator,
+singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {
+
+  def next(): CachedBatch = {
+val columnBuilders = output.map { attribute =>
+  ColumnBuilder(attribute.dataType, batchSize, attribute.name, 
useCompression)
+}.toArray
+
+var rowCount = 0
+var totalSize = 0L
+
+val terminateLoop = (singleBatch: Boolean, rowIter: 
Iterator[InternalRow],
--- End diff --

@CodingCat Could you explain me what `singleBatch` means here? I cannot get 
my head around it:) Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org