git commit: Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.

2014-07-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 635888cbe - aab534966


Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.

Author: Prashant Sharma prashan...@imaginea.com

Closes #1354 from ScrapCodes/pep8-comp-1 and squashes the following commits:

9858ea8 [Prashant Sharma] Code Review
d8851b7 [Prashant Sharma] Found # noqa works even inside comment blocks. Not 
sure if it works with all versions of python.
10c0cef [Prashant Sharma] Made rdd.py pep8 complaint by using Autopep8 and a 
little manual tweaking.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aab53496
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aab53496
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aab53496

Branch: refs/heads/master
Commit: aab5349660109481ee944721d611771da5a93109
Parents: 635888c
Author: Prashant Sharma prashan...@imaginea.com
Authored: Mon Jul 14 00:42:59 2014 -0700
Committer: Reynold Xin r...@apache.org
Committed: Mon Jul 14 00:42:59 2014 -0700

--
 python/pyspark/rdd.py | 150 +++--
 1 file changed, 92 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aab53496/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f64f48e..0c35c66 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -69,16 +69,19 @@ def _extract_concise_traceback():
 file, line, fun, what = tb[0]
 return callsite(function=fun, file=file, linenum=line)
 sfile, sline, sfun, swhat = tb[first_spark_frame]
-ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
+ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
 return callsite(function=sfun, file=ufile, linenum=uline)
 
 _spark_stack_depth = 0
 
+
 class _JavaStackTrace(object):
+
 def __init__(self, sc):
 tb = _extract_concise_traceback()
 if tb is not None:
-self._traceback = %s at %s:%s % (tb.function, tb.file, 
tb.linenum)
+self._traceback = %s at %s:%s % (
+tb.function, tb.file, tb.linenum)
 else:
 self._traceback = Error! Could not extract traceback info
 self._context = sc
@@ -95,7 +98,9 @@ class _JavaStackTrace(object):
 if _spark_stack_depth == 0:
 self._context._jsc.setCallSite(None)
 
+
 class MaxHeapQ(object):
+
 
 An implementation of MaxHeap.
  import pyspark.rdd
@@ -117,14 +122,14 @@ class MaxHeapQ(object):
 
 
 def __init__(self, maxsize):
-# we start from q[1], this makes calculating children as trivial as 2 
* k
+# We start from q[1], so its children are always  2 * k
 self.q = [0]
 self.maxsize = maxsize
 
 def _swim(self, k):
-while (k  1) and (self.q[k/2]  self.q[k]):
-self._swap(k, k/2)
-k = k/2
+while (k  1) and (self.q[k / 2]  self.q[k]):
+self._swap(k, k / 2)
+k = k / 2
 
 def _swap(self, i, j):
 t = self.q[i]
@@ -162,7 +167,9 @@ class MaxHeapQ(object):
 self.q[1] = value
 self._sink(1)
 
+
 class RDD(object):
+
 
 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
 Represents an immutable, partitioned collection of elements that can be
@@ -257,7 +264,8 @@ class RDD(object):
  sorted(rdd.map(lambda x: (x, 1)).collect())
 [('a', 1), ('b', 1), ('c', 1)]
 
-def func(split, iterator): return imap(f, iterator)
+def func(split, iterator):
+return imap(f, iterator)
 return PipelinedRDD(self, func, preservesPartitioning)
 
 def flatMap(self, f, preservesPartitioning=False):
@@ -271,7 +279,8 @@ class RDD(object):
  sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
 
-def func(s, iterator): return chain.from_iterable(imap(f, iterator))
+def func(s, iterator):
+return chain.from_iterable(imap(f, iterator))
 return self.mapPartitionsWithIndex(func, preservesPartitioning)
 
 def mapPartitions(self, f, preservesPartitioning=False):
@@ -283,7 +292,8 @@ class RDD(object):
  rdd.mapPartitions(f).collect()
 [3, 7]
 
-def func(s, iterator): return f(iterator)
+def func(s, iterator):
+return f(iterator)
 return self.mapPartitionsWithIndex(func)
 
 def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
@@ -311,17 +321,17 @@ class RDD(object):
 6
 
 warnings.warn(mapPartitionsWithSplit is deprecated; 
-use mapPartitionsWithIndex instead, 

Git Push Summary

2014-07-14 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.0.1-rc3 [created] df393cff3


git commit: [maven-release-plugin] prepare release v1.0.1-rc3

2014-07-14 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 effa69f9c - 70ee14f76


[maven-release-plugin] prepare release v1.0.1-rc3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70ee14f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70ee14f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70ee14f7

Branch: refs/heads/branch-1.0
Commit: 70ee14f76d6c3d3f162db6bbe12797c252a0295a
Parents: effa69f
Author: Ubuntu ubu...@ip-172-31-8-77.us-west-2.compute.internal
Authored: Mon Jul 14 07:46:30 2014 +
Committer: Ubuntu ubu...@ip-172-31-8-77.us-west-2.compute.internal
Committed: Mon Jul 14 07:46:30 2014 +

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 4 ++--
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 yarn/stable/pom.xml   | 2 +-
 21 files changed, 22 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index e290e79..6735379 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index c8ad40f..8a38b43 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 2302d7b..23eea6e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9156a11..d158a75 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 1cefa15..f5f0d54 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/kafka/pom.xml
--
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index cc05e69..8bc5c03 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent/artifactId
-version1.0.2-SNAPSHOT/version
+version1.0.2/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/mqtt/pom.xml
--
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 

git commit: [SPARK-2443][SQL] Fix slow read from partitioned tables

2014-07-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 38ccd6ebd - d60b09bb6


[SPARK-2443][SQL] Fix slow read from partitioned tables

This fix obtains a comparable performance boost as [PR 
#1390](https://github.com/apache/spark/pull/1390) by moving an array update and 
deserializer initialization out of a potentially very long loop. Suggested by 
yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The data 
is loaded as a table through Hive. Results are obtained on my local machine 
using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)

Author: Zongheng Yang zonghen...@gmail.com

Closes #1408 from concretevitamin/slow-read-2 and squashes the following 
commits:

d86e437 [Zongheng Yang] Move update  initialization out of potentially long 
loop.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60b09bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60b09bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60b09bb

Branch: refs/heads/master
Commit: d60b09bb60cff106fa0acddebf35714503b20f03
Parents: 38ccd6e
Author: Zongheng Yang zonghen...@gmail.com
Authored: Mon Jul 14 13:22:24 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Jul 14 13:22:24 2014 -0700

--
 .../scala/org/apache/spark/sql/hive/TableReader.scala | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d60b09bb/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 8cfde46..c394257 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -164,13 +164,17 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, 
@transient sc: HiveCon
   hivePartitionRDD.mapPartitions { iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
+
+// The update and deserializer initialization are intentionally
+// kept out of the below iter.map loop to save performance.
+rowWithPartArr.update(1, partValues)
+val deserializer = localDeserializer.newInstance()
+deserializer.initialize(hconf, partProps)
+
 // Map each tuple to a row object
 iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
   val deserializedRow = deserializer.deserialize(value)
   rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
   rowWithPartArr.asInstanceOf[Object]
 }
   }



git commit: [SPARK-2443][SQL] Fix slow read from partitioned tables

2014-07-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 baf92a0f2 - 2ec7d7ab7


[SPARK-2443][SQL] Fix slow read from partitioned tables

This fix obtains a comparable performance boost as [PR 
#1390](https://github.com/apache/spark/pull/1390) by moving an array update and 
deserializer initialization out of a potentially very long loop. Suggested by 
yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The data 
is loaded as a table through Hive. Results are obtained on my local machine 
using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
 |  | -
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)

Author: Zongheng Yang zonghen...@gmail.com

Closes #1408 from concretevitamin/slow-read-2 and squashes the following 
commits:

d86e437 [Zongheng Yang] Move update  initialization out of potentially long 
loop.

(cherry picked from commit d60b09bb60cff106fa0acddebf35714503b20f03)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ec7d7ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ec7d7ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ec7d7ab

Branch: refs/heads/branch-1.0
Commit: 2ec7d7ab751be67a86a048eed85bd9fd36dfaf83
Parents: baf92a0
Author: Zongheng Yang zonghen...@gmail.com
Authored: Mon Jul 14 13:22:24 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Jul 14 13:22:39 2014 -0700

--
 .../scala/org/apache/spark/sql/hive/TableReader.scala | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ec7d7ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 8cfde46..c394257 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -164,13 +164,17 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, 
@transient sc: HiveCon
   hivePartitionRDD.mapPartitions { iter =
 val hconf = broadcastedHiveConf.value.value
 val rowWithPartArr = new Array[Object](2)
+
+// The update and deserializer initialization are intentionally
+// kept out of the below iter.map loop to save performance.
+rowWithPartArr.update(1, partValues)
+val deserializer = localDeserializer.newInstance()
+deserializer.initialize(hconf, partProps)
+
 // Map each tuple to a row object
 iter.map { value =
-  val deserializer = localDeserializer.newInstance()
-  deserializer.initialize(hconf, partProps)
   val deserializedRow = deserializer.deserialize(value)
   rowWithPartArr.update(0, deserializedRow)
-  rowWithPartArr.update(1, partValues)
   rowWithPartArr.asInstanceOf[Object]
 }
   }



git commit: [SPARK-1946] Submit tasks after (configured ratio) executors have been registered

2014-07-14 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master d60b09bb6 - 3dd8af7a6


[SPARK-1946] Submit tasks after (configured ratio) executors have been 
registered

Because submitting tasks and registering executors are asynchronous, in most 
situation, early stages' tasks run without preferred locality.

A simple solution is sleeping few seconds in application, so that executors 
have enough time to register.

The PR add 2 configuration properties to make TaskScheduler submit tasks after 
a few of executors have been registered.

\# Submit tasks only after (registered executors / total executors) arrived the 
ratio, default value is 0
spark.scheduler.minRegisteredExecutorsRatio = 0.8

\# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the 
maxRegisteredWaitingTime(millisecond), default value is 3
spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000

Author: li-zhihui zhihui...@intel.com

Closes #900 from li-zhihui/master and squashes the following commits:

b9f8326 [li-zhihui] Add logs  edit docs
1ac08b1 [li-zhihui] Add new configs to user docs
22ead12 [li-zhihui] Move waitBackendReady to postStartHook
c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set  use constant 
DEFAULT_NUMBER_EXECUTORS
4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start  some 
code refactor
0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to 
TaskSchedulerImpl.submitTasks
4261454 [li-zhihui] Add docs for new configs  code style
ce0868a [li-zhihui] Code style, rename configuration property name of 
minRegisteredRatio  maxRegisteredWaitingTime
6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, 
driver get --num-executors in yarn/alpha
812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode
e7b6272 [li-zhihui] support yarn-cluster
37f7dc2 [li-zhihui] support yarn mode(percentage style)
3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have 
been registered


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dd8af7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dd8af7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dd8af7a

Branch: refs/heads/master
Commit: 3dd8af7a6623201c28231f4b71f59ea4e9ae29bf
Parents: d60b09b
Author: li-zhihui zhihui...@intel.com
Authored: Mon Jul 14 15:32:49 2014 -0500
Committer: Thomas Graves tgra...@apache.org
Committed: Mon Jul 14 15:32:49 2014 -0500

--
 .../scala/org/apache/spark/SparkContext.scala   | 11 +-
 .../spark/scheduler/SchedulerBackend.scala  |  1 +
 .../spark/scheduler/TaskSchedulerImpl.scala | 15 
 .../cluster/CoarseGrainedSchedulerBackend.scala | 29 ++
 .../cluster/SparkDeploySchedulerBackend.scala   |  1 +
 docs/configuration.md   | 19 ++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  1 +
 .../yarn/ApplicationMasterArguments.scala   |  6 ++-
 .../cluster/YarnClientClusterScheduler.scala|  2 +
 .../cluster/YarnClientSchedulerBackend.scala|  1 +
 .../cluster/YarnClusterScheduler.scala  |  2 +
 .../cluster/YarnClusterSchedulerBackend.scala   | 40 
 .../spark/deploy/yarn/ApplicationMaster.scala   |  1 +
 13 files changed, 127 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 8819e73..8052499 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
 throw new SparkException(YARN mode not available ?, e)
   }
 }
-val backend = new CoarseGrainedSchedulerBackend(scheduler, 
sc.env.actorSystem)
+val backend = try {
+  val clazz =
+
Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
+  val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], 
classOf[SparkContext])
+  cons.newInstance(scheduler, 
sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+} catch {
+  case e: Exception = {
+throw new SparkException(YARN mode not available ?, e)
+  }
+}
 scheduler.initialize(backend)
 scheduler
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
--
diff --git 

Subscribe

2014-07-14 Thread Mubarak Seyed



git commit: [SPARK-2446][SQL] Add BinaryType support to Parquet I/O.

2014-07-14 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 3dd8af7a6 - 9fe693b5b


[SPARK-2446][SQL] Add BinaryType support to Parquet I/O.

Note that this commit changes the semantics when loading in data that was 
created with prior versions of Spark SQL.  Before, we were writing out strings 
as Binary data without adding any other annotations. Thus, when data is read in 
from prior versions, data that was StringType will now become BinaryType.  
Users that need strings can CAST that column to a String.  It was decided that 
while this breaks compatibility, it does make us compatible with other systems 
(Hive, Thrift, etc) and adds support for Binary data, so this is the right 
decision long term.

To support `BinaryType`, the following changes are needed:
- Make `StringType` use `OriginalType.UTF8`
- Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType`

Author: Takuya UESHIN ues...@happy-camper.st

Closes #1373 from ueshin/issues/SPARK-2446 and squashes the following commits:

ecacb92 [Takuya UESHIN] Add BinaryType support to Parquet I/O.
616e04a [Takuya UESHIN] Make StringType use OriginalType.UTF8.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fe693b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fe693b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fe693b5

Branch: refs/heads/master
Commit: 9fe693b5b6ed6af34ee1e800ab89c8a11991ea38
Parents: 3dd8af7
Author: Takuya UESHIN ues...@happy-camper.st
Authored: Mon Jul 14 15:42:28 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Mon Jul 14 15:42:35 2014 -0700

--
 .../spark/sql/parquet/ParquetConverter.scala|  2 +-
 .../spark/sql/parquet/ParquetTableSupport.scala |  4 ++
 .../spark/sql/parquet/ParquetTestData.scala | 18 +++---
 .../apache/spark/sql/parquet/ParquetTypes.scala | 62 ++--
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 16 +++--
 5 files changed, 57 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 75748b2..de8fe2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -114,7 +114,7 @@ private[sql] object CatalystConverter {
 }
   }
   // All other primitive types use the default converter
-  case ctype: NativeType = { // note: need the type tag here!
+  case ctype: PrimitiveType = { // note: need the type tag here!
 new CatalystPrimitiveConverter(parent, fieldIndex)
   }
   case _ = throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 108f8b6..f1953a0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -191,6 +191,8 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
 value.asInstanceOf[String].getBytes(utf-8)
   )
 )
+case BinaryType = writer.addBinary(
+  Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
 case IntegerType = writer.addInteger(value.asInstanceOf[Int])
 case ShortType = writer.addInteger(value.asInstanceOf[Short])
 case LongType = writer.addLong(value.asInstanceOf[Long])
@@ -299,6 +301,8 @@ private[parquet] class MutableRowWriteSupport extends 
RowWriteSupport {
   record(index).asInstanceOf[String].getBytes(utf-8)
 )
   )
+  case BinaryType = writer.addBinary(
+Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]]))
   case IntegerType = writer.addInteger(record.getInt(index))
   case ShortType = writer.addInteger(record.getShort(index))
   case LongType = writer.addLong(record.getLong(index))

http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala
--
diff --git