Repository: spark
Updated Branches:
  refs/heads/master ed47b1e66 -> 58b4e4f88


[SPARK-11787][SPARK-11883][SQL][FOLLOW-UP] Cleanup for this patch.

This mainly moves SqlNewHadoopRDD to the sql package. There is some state that 
is
shared between core and I've left that in core. This allows some other 
associated
minor cleanup.

Author: Nong Li <n...@databricks.com>

Closes #9845 from nongli/spark-11787.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58b4e4f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58b4e4f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58b4e4f8

Branch: refs/heads/master
Commit: 58b4e4f88a330135c4cec04a30d24ef91bc61d91
Parents: ed47b1e
Author: Nong Li <n...@databricks.com>
Authored: Fri Nov 20 15:30:53 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Nov 20 15:30:53 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   6 +-
 .../org/apache/spark/rdd/SqlNewHadoopRDD.scala  | 317 -------------------
 .../apache/spark/rdd/SqlNewHadoopRDDState.scala |  41 +++
 .../sql/catalyst/expressions/UnsafeRow.java     |  59 +++-
 .../catalyst/expressions/InputFileName.scala    |   6 +-
 .../parquet/UnsafeRowParquetRecordReader.java   |  14 +
 .../scala/org/apache/spark/sql/SQLConf.scala    |   5 +
 .../execution/datasources/SqlNewHadoopRDD.scala | 299 +++++++++++++++++
 .../datasources/parquet/ParquetRelation.scala   |   2 +-
 .../parquet/ParquetFilterSuite.scala            |  43 +--
 .../datasources/parquet/ParquetIOSuite.scala    |  19 ++
 11 files changed, 453 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 7db5834..f37c95b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,8 +215,8 @@ class HadoopRDD[K, V](
 
       // Sets the thread local variable for the file's name
       split.inputSplit.value match {
-        case fs: FileSplit => 
SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
-        case _ => SqlNewHadoopRDD.unsetInputFileName()
+        case fs: FileSplit => 
SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
+        case _ => SqlNewHadoopRDDState.unsetInputFileName()
       }
 
       // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
@@ -256,7 +256,7 @@ class HadoopRDD[K, V](
 
       override def close() {
         if (reader != null) {
-          SqlNewHadoopRDD.unsetInputFileName()
+          SqlNewHadoopRDDState.unsetInputFileName()
           // Close the reader and release it. Note: it's very important that 
we don't close the
           // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
           // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
deleted file mode 100644
index 4d17633..0000000
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{Utils, SerializableConfiguration, 
ShutdownHookManager}
-import org.apache.spark.{Partition => SparkPartition, _}
-
-import scala.reflect.ClassTag
-
-
-private[spark] class SqlNewHadoopPartition(
-    rddId: Int,
-    val index: Int,
-    rawSplit: InputSplit with Writable)
-  extends SparkPartition {
-
-  val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
-  override def hashCode(): Int = 41 * (41 + rddId) + index
-}
-
-/**
- * An RDD that provides core functionality for reading data stored in Hadoop 
(e.g., files in HDFS,
- * sources in HBase, or S3), using the new MapReduce API 
(`org.apache.hadoop.mapreduce`).
- * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three 
additions.
- * 1. A shared broadcast Hadoop Configuration.
- * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations 
at the driver side
- *    to the shared Hadoop Configuration.
- * 3. An optional closure `initLocalJobFuncOpt` that set configurations at 
both the driver side
- *    and the executor side to the shared Hadoop Configuration.
- *
- * Note: This is RDD is basically a cloned version of 
[[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]].
- */
-private[spark] class SqlNewHadoopRDD[V: ClassTag](
-    sc : SparkContext,
-    broadcastedConf: Broadcast[SerializableConfiguration],
-    @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
-    initLocalJobFuncOpt: Option[Job => Unit],
-    inputFormatClass: Class[_ <: InputFormat[Void, V]],
-    valueClass: Class[V])
-  extends RDD[V](sc, Nil)
-  with SparkHadoopMapReduceUtil
-  with Logging {
-
-  protected def getJob(): Job = {
-    val conf: Configuration = broadcastedConf.value.value
-    // "new Job" will make a copy of the conf. Then, it is
-    // safe to mutate conf properties with initLocalJobFuncOpt
-    // and initDriverSideJobFuncOpt.
-    val newJob = new Job(conf)
-    initLocalJobFuncOpt.map(f => f(newJob))
-    newJob
-  }
-
-  def getConf(isDriverSide: Boolean): Configuration = {
-    val job = getJob()
-    if (isDriverSide) {
-      initDriverSideJobFuncOpt.map(f => f(job))
-    }
-    SparkHadoopUtil.get.getConfigurationFromJobContext(job)
-  }
-
-  private val jobTrackerId: String = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    formatter.format(new Date())
-  }
-
-  @transient protected val jobId = new JobID(jobTrackerId, id)
-
-  // If true, enable using the custom RecordReader for parquet. This only 
works for
-  // a subset of the types (no complex types).
-  protected val enableUnsafeRowParquetReader: Boolean =
-      sc.conf.getBoolean("spark.parquet.enableUnsafeRowRecordReader", true)
-
-  override def getPartitions: Array[SparkPartition] = {
-    val conf = getConf(isDriverSide = true)
-    val inputFormat = inputFormatClass.newInstance
-    inputFormat match {
-      case configurable: Configurable =>
-        configurable.setConf(conf)
-      case _ =>
-    }
-    val jobContext = newJobContext(conf, jobId)
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val result = new Array[SparkPartition](rawSplits.size)
-    for (i <- 0 until rawSplits.size) {
-      result(i) =
-        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit 
with Writable])
-    }
-    result
-  }
-
-  override def compute(
-      theSplit: SparkPartition,
-      context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[V] {
-      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit)
-      val conf = getConf(isDriverSide = false)
-
-      val inputMetrics = context.taskMetrics
-        .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
-
-      // Sets the thread local variable for the file's name
-      split.serializableHadoopSplit.value match {
-        case fs: FileSplit => 
SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
-        case _ => SqlNewHadoopRDD.unsetInputFileName()
-      }
-
-      // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
-      // creating RecordReader, because RecordReader's constructor might read 
some bytes
-      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
-        split.serializableHadoopSplit.value match {
-          case _: FileSplit | _: CombineFileSplit =>
-            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
-          case _ => None
-        }
-      }
-      inputMetrics.setBytesReadCallback(bytesReadCallback)
-
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
-      val format = inputFormatClass.newInstance
-      format match {
-        case configurable: Configurable =>
-          configurable.setConf(conf)
-        case _ =>
-      }
-      private[this] var reader: RecordReader[Void, V] = null
-
-      /**
-        * If the format is ParquetInputFormat, try to create the optimized 
RecordReader. If this
-        * fails (for example, unsupported schema), try with the normal reader.
-        * TODO: plumb this through a different way?
-        */
-      if (enableUnsafeRowParquetReader &&
-          format.getClass.getName == 
"org.apache.parquet.hadoop.ParquetInputFormat") {
-        // TODO: move this class to sql.execution and remove this.
-        reader = Utils.classForName(
-          
"org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader")
-          .newInstance().asInstanceOf[RecordReader[Void, V]]
-        try {
-          reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
-        } catch {
-          case e: Exception => reader = null
-        }
-      }
-
-      if (reader == null) {
-        reader = format.createRecordReader(
-          split.serializableHadoopSplit.value, hadoopAttemptContext)
-        reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
-      }
-
-      // Register an on-task-completion callback to close the input stream.
-      context.addTaskCompletionListener(context => close())
-
-      private[this] var havePair = false
-      private[this] var finished = false
-
-      override def hasNext: Boolean = {
-        if (context.isInterrupted) {
-          throw new TaskKilledException
-        }
-        if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
-          if (finished) {
-            // Close and release the reader here; close() will also be called 
when the task
-            // completes, but for tasks that read from many files, it helps to 
release the
-            // resources early.
-            close()
-          }
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        if (!finished) {
-          inputMetrics.incRecordsRead(1)
-        }
-        reader.getCurrentValue
-      }
-
-      private def close() {
-        if (reader != null) {
-          SqlNewHadoopRDD.unsetInputFileName()
-          // Close the reader and release it. Note: it's very important that 
we don't close the
-          // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
-          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic
-          // corruption issues when reading compressed input.
-          try {
-            reader.close()
-          } catch {
-            case e: Exception =>
-              if (!ShutdownHookManager.inShutdown()) {
-                logWarning("Exception in RecordReader.close()", e)
-              }
-          } finally {
-            reader = null
-          }
-          if (bytesReadCallback.isDefined) {
-            inputMetrics.updateBytesRead()
-          } else if 
(split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
-                     
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
-            // If we can't get the bytes read from the FS stats, fall back to 
the split size,
-            // which may be inaccurate.
-            try {
-              
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
-            } catch {
-              case e: java.io.IOException =>
-                logWarning("Unable to get input size to set InputMetrics for 
task", e)
-            }
-          }
-        }
-      }
-    }
-    iter
-  }
-
-  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
-    val split = 
hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
-    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
-      case Some(c) =>
-        try {
-          val infos = 
c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
-          Some(HadoopRDD.convertSplitLocationInfo(infos))
-        } catch {
-          case e : Exception =>
-            logDebug("Failed to use InputSplit#getLocationInfo.", e)
-            None
-        }
-      case None => None
-    }
-    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
-  }
-
-  override def persist(storageLevel: StorageLevel): this.type = {
-    if (storageLevel.deserialized) {
-      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads 
to undesired" +
-        " behavior because Hadoop's RecordReader reuses the same Writable 
object for all records." +
-        " Use a map transformation to make copies of the records.")
-    }
-    super.persist(storageLevel)
-  }
-}
-
-private[spark] object SqlNewHadoopRDD {
-
-  /**
-   * The thread variable for the name of the current file being read. This is 
used by
-   * the InputFileName function in Spark SQL.
-   */
-  private[this] val inputFileName: ThreadLocal[UTF8String] = new 
ThreadLocal[UTF8String] {
-    override protected def initialValue(): UTF8String = 
UTF8String.fromString("")
-  }
-
-  def getInputFileName(): UTF8String = inputFileName.get()
-
-  private[spark] def setInputFileName(file: String) = 
inputFileName.set(UTF8String.fromString(file))
-
-  private[spark] def unsetInputFileName(): Unit = inputFileName.remove()
-
-  /**
-   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an 
InputSplit to
-   * the given function rather than the index of the partition.
-   */
-  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: 
ClassTag](
-      prev: RDD[T],
-      f: (InputSplit, Iterator[T]) => Iterator[U],
-      preservesPartitioning: Boolean = false)
-    extends RDD[U](prev) {
-
-    override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
-
-    override def getPartitions: Array[SparkPartition] = 
firstParent[T].partitions
-
-    override def compute(split: SparkPartition, context: TaskContext): 
Iterator[U] = {
-      val partition = split.asInstanceOf[SqlNewHadoopPartition]
-      val inputSplit = partition.serializableHadoopSplit.value
-      f(inputSplit, firstParent[T].iterator(split, context))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala 
b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
new file mode 100644
index 0000000..3f15fff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * State for SqlNewHadoopRDD objects. This is split this way because of the 
package splits.
+ * TODO: Move/Combine this with 
org.apache.spark.sql.datasources.SqlNewHadoopRDD
+ */
+private[spark] object SqlNewHadoopRDDState {
+  /**
+   * The thread variable for the name of the current file being read. This is 
used by
+   * the InputFileName function in Spark SQL.
+   */
+  private[this] val inputFileName: ThreadLocal[UTF8String] = new 
ThreadLocal[UTF8String] {
+    override protected def initialValue(): UTF8String = 
UTF8String.fromString("")
+  }
+
+  def getInputFileName(): UTF8String = inputFileName.get()
+
+  private[spark] def setInputFileName(file: String) = 
inputFileName.set(UTF8String.fromString(file))
+
+  private[spark] def unsetInputFileName(): Unit = inputFileName.remove()
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 3376936..b6979d0 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -17,7 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import java.io.*;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -26,12 +30,26 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.NullType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.UserDefinedType;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
@@ -39,9 +57,23 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
-import static org.apache.spark.sql.types.DataTypes.*;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.ByteType;
+import static org.apache.spark.sql.types.DataTypes.DateType;
+import static org.apache.spark.sql.types.DataTypes.DoubleType;
+import static org.apache.spark.sql.types.DataTypes.FloatType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.NullType;
+import static org.apache.spark.sql.types.DataTypes.ShortType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 /**
  * An Unsafe implementation of Row which is backed by raw memory instead of 
Java objects.
  *
@@ -116,11 +148,6 @@ public final class UnsafeRow extends MutableRow implements 
Externalizable, KryoS
   /** The size of this row's backing data, in bytes) */
   private int sizeInBytes;
 
-  private void setNotNullAt(int i) {
-    assertIndexIsValid(i);
-    BitSetMethods.unset(baseObject, baseOffset, i);
-  }
-
   /** The width of the null tracking bit set, in bytes */
   private int bitSetWidthInBytes;
 
@@ -187,6 +214,12 @@ public final class UnsafeRow extends MutableRow implements 
Externalizable, KryoS
     pointTo(buf, numFields, sizeInBytes);
   }
 
+
+  public void setNotNullAt(int i) {
+    assertIndexIsValid(i);
+    BitSetMethods.unset(baseObject, baseOffset, i);
+  }
+
   @Override
   public void setNullAt(int i) {
     assertIndexIsValid(i);

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
index d809877..bf21578 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.rdd.SqlNewHadoopRDD
+import org.apache.spark.rdd.SqlNewHadoopRDDState
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, 
CodeGenContext}
 import org.apache.spark.sql.types.{DataType, StringType}
@@ -37,13 +37,13 @@ case class InputFileName() extends LeafExpression with 
Nondeterministic {
   override protected def initInternal(): Unit = {}
 
   override protected def evalInternal(input: InternalRow): UTF8String = {
-    SqlNewHadoopRDD.getInputFileName()
+    SqlNewHadoopRDDState.getInputFileName()
   }
 
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
     ev.isNull = "false"
     s"final ${ctx.javaType(dataType)} ${ev.value} = " +
-      "org.apache.spark.rdd.SqlNewHadoopRDD.getInputFileName();"
+      "org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 8a92e48..dade488 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -109,6 +109,19 @@ public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBas
   private static final int DEFAULT_VAR_LEN_SIZE = 32;
 
   /**
+   * Tries to initialize the reader for this split. Returns true if this 
reader supports reading
+   * this split and false otherwise.
+   */
+  public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) {
+    try {
+      initialize(inputSplit, taskAttemptContext);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  /**
    * Implementation of RecordReader API.
    */
   @Override
@@ -326,6 +339,7 @@ public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBas
         } else {
           rowWriters[n].write(col, bytes.array(), bytes.position(), len);
         }
+        rows[n].setNotNullAt(col);
       } else {
         rows[n].setNullAt(col);
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f40e603..5ef3a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -323,6 +323,11 @@ private[spark] object SQLConf {
       "option must be set in Hadoop Configuration.  2. This option overrides " 
+
       "\"spark.sql.sources.outputCommitterClass\".")
 
+  val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
+    key = "spark.sql.parquet.enableUnsafeRowRecordReader",
+    defaultValue = Some(true),
+    doc = "Enables using the custom ParquetUnsafeRowRecordReader.")
+
   val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
     defaultValue = Some(false),
     doc = "When true, enable filter pushdown for ORC files.")

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
new file mode 100644
index 0000000..56cb63d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql.{SQLConf, SQLContext}
+import 
org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
+import org.apache.spark.{Partition => SparkPartition, _}
+
+
+private[spark] class SqlNewHadoopPartition(
+    rddId: Int,
+    val index: Int,
+    rawSplit: InputSplit with Writable)
+  extends SparkPartition {
+
+  val serializableHadoopSplit = new SerializableWritable(rawSplit)
+
+  override def hashCode(): Int = 41 * (41 + rddId) + index
+}
+
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop 
(e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API 
(`org.apache.hadoop.mapreduce`).
+ * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three 
additions.
+ * 1. A shared broadcast Hadoop Configuration.
+ * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations 
at the driver side
+ *    to the shared Hadoop Configuration.
+ * 3. An optional closure `initLocalJobFuncOpt` that set configurations at 
both the driver side
+ *    and the executor side to the shared Hadoop Configuration.
+ *
+ * Note: This is RDD is basically a cloned version of 
[[org.apache.spark.rdd.NewHadoopRDD]] with
+ * changes based on [[org.apache.spark.rdd.HadoopRDD]].
+ */
+private[spark] class SqlNewHadoopRDD[V: ClassTag](
+    sqlContext: SQLContext,
+    broadcastedConf: Broadcast[SerializableConfiguration],
+    @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
+    initLocalJobFuncOpt: Option[Job => Unit],
+    inputFormatClass: Class[_ <: InputFormat[Void, V]],
+    valueClass: Class[V])
+    extends RDD[V](sqlContext.sparkContext, Nil)
+  with SparkHadoopMapReduceUtil
+  with Logging {
+
+  protected def getJob(): Job = {
+    val conf: Configuration = broadcastedConf.value.value
+    // "new Job" will make a copy of the conf. Then, it is
+    // safe to mutate conf properties with initLocalJobFuncOpt
+    // and initDriverSideJobFuncOpt.
+    val newJob = new Job(conf)
+    initLocalJobFuncOpt.map(f => f(newJob))
+    newJob
+  }
+
+  def getConf(isDriverSide: Boolean): Configuration = {
+    val job = getJob()
+    if (isDriverSide) {
+      initDriverSideJobFuncOpt.map(f => f(job))
+    }
+    SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+  }
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  @transient protected val jobId = new JobID(jobTrackerId, id)
+
+  // If true, enable using the custom RecordReader for parquet. This only 
works for
+  // a subset of the types (no complex types).
+  protected val enableUnsafeRowParquetReader: Boolean =
+    
sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
+
+  override def getPartitions: Array[SparkPartition] = {
+    val conf = getConf(isDriverSide = true)
+    val inputFormat = inputFormatClass.newInstance
+    inputFormat match {
+      case configurable: Configurable =>
+        configurable.setConf(conf)
+      case _ =>
+    }
+    val jobContext = newJobContext(conf, jobId)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val result = new Array[SparkPartition](rawSplits.size)
+    for (i <- 0 until rawSplits.size) {
+      result(i) =
+        new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit 
with Writable])
+    }
+    result
+  }
+
+  override def compute(
+    theSplit: SparkPartition,
+    context: TaskContext): Iterator[V] = {
+    val iter = new Iterator[V] {
+      val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit)
+      val conf = getConf(isDriverSide = false)
+
+      val inputMetrics = context.taskMetrics
+        .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+
+      // Sets the thread local variable for the file's name
+      split.serializableHadoopSplit.value match {
+        case fs: FileSplit => 
SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
+        case _ => SqlNewHadoopRDDState.unsetInputFileName()
+      }
+
+      // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
+      // creating RecordReader, because RecordReader's constructor might read 
some bytes
+      val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
+        split.serializableHadoopSplit.value match {
+          case _: FileSplit | _: CombineFileSplit =>
+            SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+          case _ => None
+        }
+      }
+      inputMetrics.setBytesReadCallback(bytesReadCallback)
+
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, 
split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+      val format = inputFormatClass.newInstance
+      format match {
+        case configurable: Configurable =>
+          configurable.setConf(conf)
+        case _ =>
+      }
+      private[this] var reader: RecordReader[Void, V] = null
+
+      /**
+        * If the format is ParquetInputFormat, try to create the optimized 
RecordReader. If this
+        * fails (for example, unsupported schema), try with the normal reader.
+        * TODO: plumb this through a different way?
+        */
+      if (enableUnsafeRowParquetReader &&
+        format.getClass.getName == 
"org.apache.parquet.hadoop.ParquetInputFormat") {
+        val parquetReader: UnsafeRowParquetRecordReader = new 
UnsafeRowParquetRecordReader()
+        if (!parquetReader.tryInitialize(
+            split.serializableHadoopSplit.value, hadoopAttemptContext)) {
+          parquetReader.close()
+        } else {
+          reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
+        }
+      }
+
+      if (reader == null) {
+        reader = format.createRecordReader(
+          split.serializableHadoopSplit.value, hadoopAttemptContext)
+        reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+      }
+
+      // Register an on-task-completion callback to close the input stream.
+      context.addTaskCompletionListener(context => close())
+
+      private[this] var havePair = false
+      private[this] var finished = false
+
+      override def hasNext: Boolean = {
+        if (context.isInterrupted) {
+          throw new TaskKilledException
+        }
+        if (!finished && !havePair) {
+          finished = !reader.nextKeyValue
+          if (finished) {
+            // Close and release the reader here; close() will also be called 
when the task
+            // completes, but for tasks that read from many files, it helps to 
release the
+            // resources early.
+            close()
+          }
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        if (!finished) {
+          inputMetrics.incRecordsRead(1)
+        }
+        reader.getCurrentValue
+      }
+
+      private def close() {
+        if (reader != null) {
+          SqlNewHadoopRDDState.unsetInputFileName()
+          // Close the reader and release it. Note: it's very important that 
we don't close the
+          // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
+          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic
+          // corruption issues when reading compressed input.
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              if (!ShutdownHookManager.inShutdown()) {
+                logWarning("Exception in RecordReader.close()", e)
+              }
+          } finally {
+            reader = null
+          }
+          if (bytesReadCallback.isDefined) {
+            inputMetrics.updateBytesRead()
+          } else if 
(split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
+            
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
+            // If we can't get the bytes read from the FS stats, fall back to 
the split size,
+            // which may be inaccurate.
+            try {
+              
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
+            } catch {
+              case e: java.io.IOException =>
+                logWarning("Unable to get input size to set InputMetrics for 
task", e)
+            }
+          }
+        }
+      }
+    }
+    iter
+  }
+
+  override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
+    val split = 
hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
+    val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
+      case Some(c) =>
+        try {
+          val infos = 
c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
+          Some(HadoopRDD.convertSplitLocationInfo(infos))
+        } catch {
+          case e : Exception =>
+            logDebug("Failed to use InputSplit#getLocationInfo.", e)
+            None
+        }
+      case None => None
+    }
+    locs.getOrElse(split.getLocations.filter(_ != "localhost"))
+  }
+
+  override def persist(storageLevel: StorageLevel): this.type = {
+    if (storageLevel.deserialized) {
+      logWarning("Caching NewHadoopRDDs as deserialized objects usually leads 
to undesired" +
+        " behavior because Hadoop's RecordReader reuses the same Writable 
object for all records." +
+        " Use a map transformation to make copies of the records.")
+    }
+    super.persist(storageLevel)
+  }
+
+  /**
+   * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an 
InputSplit to
+   * the given function rather than the index of the partition.
+   */
+  private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: 
ClassTag](
+      prev: RDD[T],
+      f: (InputSplit, Iterator[T]) => Iterator[U],
+      preservesPartitioning: Boolean = false)
+    extends RDD[U](prev) {
+
+    override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+
+    override def getPartitions: Array[SparkPartition] = 
firstParent[T].partitions
+
+    override def compute(split: SparkPartition, context: TaskContext): 
Iterator[U] = {
+      val partition = split.asInstanceOf[SqlNewHadoopPartition]
+      val inputSplit = partition.serializableHadoopSplit.value
+      f(inputSplit, firstParent[T].iterator(split, context))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index cb0aab8..fdd745f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -319,7 +319,7 @@ private[sql] class ParquetRelation(
 
     Utils.withDummyCallSite(sqlContext.sparkContext) {
       new SqlNewHadoopRDD(
-        sc = sqlContext.sparkContext,
+        sqlContext = sqlContext,
         broadcastedConf = broadcastedConf,
         initDriverSideJobFuncOpt = Some(setInputPaths),
         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index c8028a5..cc5aae0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -337,29 +337,30 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
     }
   }
 
-  // Renable when we can toggle custom ParquetRecordReader on/off. The custom 
reader does
-  // not do row by row filtering (and we probably don't want to push that).
-  ignore("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
+  // The unsafe row RecordReader does not support row by row filtering so run 
it with it disabled.
+  test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
     import testImplicits._
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withTempPath { dir =>
-        val path = s"${dir.getCanonicalPath}/part=1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
-        val df = sqlContext.read.parquet(path).filter("a = 2")
-
-        // This is the source RDD without Spark-side filtering.
-        val childRDD =
-          df
-            .queryExecution
-            .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
-            .child
-            .execute()
-
-        // The result should be single row.
-        // When a filter is pushed to Parquet, Parquet can apply it to every 
row.
-        // So, we can check the number of rows returned from the Parquet
-        // to make sure our filter pushdown work.
-        assert(childRDD.count == 1)
+      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
+        withTempPath { dir =>
+          val path = s"${dir.getCanonicalPath}/part=1"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+          val df = sqlContext.read.parquet(path).filter("a = 2")
+
+          // This is the source RDD without Spark-side filtering.
+          val childRDD =
+            df
+              .queryExecution
+              .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
+              .child
+              .execute()
+
+          // The result should be single row.
+          // When a filter is pushed to Parquet, Parquet can apply it to every 
row.
+          // So, we can check the number of rows returned from the Parquet
+          // to make sure our filter pushdown work.
+          assert(childRDD.count == 1)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/58b4e4f8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 177ab42..0c5d488 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -579,6 +579,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
     }
   }
 
+  test("null and non-null strings") {
+    // Create a dataset where the first values are NULL and then some non-null 
values. The
+    // number of non-nulls needs to be bigger than the ParquetReader batch 
size.
+    val data = sqlContext.range(200).map { i =>
+      if (i.getLong(0) < 150) Row(None)
+      else Row("a")
+    }
+    val df = sqlContext.createDataFrame(data, StructType(StructField("col", 
StringType) :: Nil))
+    assert(df.agg("col" -> "count").collect().head.getLong(0) == 50)
+
+    withTempPath { dir =>
+      val path = s"${dir.getCanonicalPath}/data"
+      df.write.parquet(path)
+
+      val df2 = sqlContext.read.parquet(path)
+      assert(df2.agg("col" -> "count").collect().head.getLong(0) == 50)
+    }
+  }
+
   test("read dictionary encoded decimals written as INT32") {
     checkAnswer(
       // Decimal column in this file is encoded using plain dictionary


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to