Repository: spark
Updated Branches:
  refs/heads/master 49b1504fe -> b87825310


[SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 and 
InterruptibleIterator.

A small performance optimization – we don't need to generate a Tuple2 and 
then immediately discard the key. We also don't need an extra wrapper from 
InterruptibleIterator.

Author: Reynold Xin <r...@databricks.com>

Closes #8000 from rxin/SPARK-9692 and squashes the following commits:

1d4d0b3 [Reynold Xin] [SPARK-9692] Remove SqlNewHadoopRDD's generated Tuple2 
and InterruptibleIterator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8782531
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8782531
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8782531

Branch: refs/heads/master
Commit: b87825310ac87485672868bf6a9ed01d154a3626
Parents: 49b1504
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Aug 6 18:25:38 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Aug 6 18:25:38 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/SqlNewHadoopRDD.scala  | 44 ++++++++------------
 .../spark/sql/parquet/ParquetRelation.scala     |  3 +-
 2 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8782531/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 35e44cb..6a95e44 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -26,14 +26,12 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -60,18 +58,16 @@ private[spark] class SqlNewHadoopPartition(
  *    and the executor side to the shared Hadoop Configuration.
  *
  * Note: This is RDD is basically a cloned version of 
[[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this 
functionality will be
- * folded into core.
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]].
  */
-private[spark] class SqlNewHadoopRDD[K, V](
+private[spark] class SqlNewHadoopRDD[V: ClassTag](
     @transient sc : SparkContext,
     broadcastedConf: Broadcast[SerializableConfiguration],
     @transient initDriverSideJobFuncOpt: Option[Job => Unit],
     initLocalJobFuncOpt: Option[Job => Unit],
-    inputFormatClass: Class[_ <: InputFormat[K, V]],
-    keyClass: Class[K],
+    inputFormatClass: Class[_ <: InputFormat[Void, V]],
     valueClass: Class[V])
-  extends RDD[(K, V)](sc, Nil)
+  extends RDD[V](sc, Nil)
   with SparkHadoopMapReduceUtil
   with Logging {
 
@@ -120,8 +116,8 @@ private[spark] class SqlNewHadoopRDD[K, V](
 
   override def compute(
       theSplit: SparkPartition,
-      context: TaskContext): InterruptibleIterator[(K, V)] = {
-    val iter = new Iterator[(K, V)] {
+      context: TaskContext): Iterator[V] = {
+    val iter = new Iterator[V] {
       val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
       logInfo("Input split: " + split.serializableHadoopSplit)
       val conf = getConf(isDriverSide = false)
@@ -154,17 +150,20 @@ private[spark] class SqlNewHadoopRDD[K, V](
           configurable.setConf(conf)
         case _ =>
       }
-      private var reader = format.createRecordReader(
+      private[this] var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
 
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())
-      var havePair = false
-      var finished = false
-      var recordsSinceMetricsUpdate = 0
+
+      private[this] var havePair = false
+      private[this] var finished = false
 
       override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
         if (!finished && !havePair) {
           finished = !reader.nextKeyValue
           if (finished) {
@@ -178,7 +177,7 @@ private[spark] class SqlNewHadoopRDD[K, V](
         !finished
       }
 
-      override def next(): (K, V) = {
+      override def next(): V = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
@@ -186,7 +185,7 @@ private[spark] class SqlNewHadoopRDD[K, V](
         if (!finished) {
           inputMetrics.incRecordsRead(1)
         }
-        (reader.getCurrentKey, reader.getCurrentValue)
+        reader.getCurrentValue
       }
 
       private def close() {
@@ -212,23 +211,14 @@ private[spark] class SqlNewHadoopRDD[K, V](
             }
           }
         } catch {
-          case e: Exception => {
+          case e: Exception =>
             if (!Utils.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
-          }
         }
       }
     }
-    new InterruptibleIterator(context, iter)
-  }
-
-  /** Maps over a partition, providing the InputSplit that was used as the 
base of the partition. */
-  @DeveloperApi
-  def mapPartitionsWithInputSplit[U: ClassTag](
-      f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
-      preservesPartitioning: Boolean = false): RDD[U] = {
-    new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
+    iter
   }
 
   override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b8782531/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index b4337a4..29c388c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -291,7 +291,6 @@ private[sql] class ParquetRelation(
         initDriverSideJobFuncOpt = Some(setInputPaths),
         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
         inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
-        keyClass = classOf[Void],
         valueClass = classOf[InternalRow]) {
 
         val cacheMetadata = useMetadataCache
@@ -328,7 +327,7 @@ private[sql] class ParquetRelation(
             new SqlNewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
           }
         }
-      }.values.asInstanceOf[RDD[Row]]  // type erasure hack to pass 
RDD[InternalRow] as RDD[Row]
+      }.asInstanceOf[RDD[Row]]  // type erasure hack to pass RDD[InternalRow] 
as RDD[Row]
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to