git commit: Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.
Repository: spark Updated Branches: refs/heads/master 635888cbe - aab534966 Made rdd.py pep8 complaint by using Autopep8 and a little manual editing. Author: Prashant Sharma prashan...@imaginea.com Closes #1354 from ScrapCodes/pep8-comp-1 and squashes the following commits: 9858ea8 [Prashant Sharma] Code Review d8851b7 [Prashant Sharma] Found # noqa works even inside comment blocks. Not sure if it works with all versions of python. 10c0cef [Prashant Sharma] Made rdd.py pep8 complaint by using Autopep8 and a little manual tweaking. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aab53496 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aab53496 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aab53496 Branch: refs/heads/master Commit: aab5349660109481ee944721d611771da5a93109 Parents: 635888c Author: Prashant Sharma prashan...@imaginea.com Authored: Mon Jul 14 00:42:59 2014 -0700 Committer: Reynold Xin r...@apache.org Committed: Mon Jul 14 00:42:59 2014 -0700 -- python/pyspark/rdd.py | 150 +++-- 1 file changed, 92 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aab53496/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f64f48e..0c35c66 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -69,16 +69,19 @@ def _extract_concise_traceback(): file, line, fun, what = tb[0] return callsite(function=fun, file=file, linenum=line) sfile, sline, sfun, swhat = tb[first_spark_frame] -ufile, uline, ufun, uwhat = tb[first_spark_frame-1] +ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] return callsite(function=sfun, file=ufile, linenum=uline) _spark_stack_depth = 0 + class _JavaStackTrace(object): + def __init__(self, sc): tb = _extract_concise_traceback() if tb is not None: -self._traceback = %s at %s:%s % (tb.function, tb.file, tb.linenum) +self._traceback = %s at %s:%s % ( +tb.function, tb.file, tb.linenum) else: self._traceback = Error! Could not extract traceback info self._context = sc @@ -95,7 +98,9 @@ class _JavaStackTrace(object): if _spark_stack_depth == 0: self._context._jsc.setCallSite(None) + class MaxHeapQ(object): + An implementation of MaxHeap. import pyspark.rdd @@ -117,14 +122,14 @@ class MaxHeapQ(object): def __init__(self, maxsize): -# we start from q[1], this makes calculating children as trivial as 2 * k +# We start from q[1], so its children are always 2 * k self.q = [0] self.maxsize = maxsize def _swim(self, k): -while (k 1) and (self.q[k/2] self.q[k]): -self._swap(k, k/2) -k = k/2 +while (k 1) and (self.q[k / 2] self.q[k]): +self._swap(k, k / 2) +k = k / 2 def _swap(self, i, j): t = self.q[i] @@ -162,7 +167,9 @@ class MaxHeapQ(object): self.q[1] = value self._sink(1) + class RDD(object): + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be @@ -257,7 +264,8 @@ class RDD(object): sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] -def func(split, iterator): return imap(f, iterator) +def func(split, iterator): +return imap(f, iterator) return PipelinedRDD(self, func, preservesPartitioning) def flatMap(self, f, preservesPartitioning=False): @@ -271,7 +279,8 @@ class RDD(object): sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] -def func(s, iterator): return chain.from_iterable(imap(f, iterator)) +def func(s, iterator): +return chain.from_iterable(imap(f, iterator)) return self.mapPartitionsWithIndex(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): @@ -283,7 +292,8 @@ class RDD(object): rdd.mapPartitions(f).collect() [3, 7] -def func(s, iterator): return f(iterator) +def func(s, iterator): +return f(iterator) return self.mapPartitionsWithIndex(func) def mapPartitionsWithIndex(self, f, preservesPartitioning=False): @@ -311,17 +321,17 @@ class RDD(object): 6 warnings.warn(mapPartitionsWithSplit is deprecated; -use mapPartitionsWithIndex instead,
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.0.1-rc3 [created] df393cff3
git commit: [maven-release-plugin] prepare release v1.0.1-rc3
Repository: spark Updated Branches: refs/heads/branch-1.0 effa69f9c - 70ee14f76 [maven-release-plugin] prepare release v1.0.1-rc3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70ee14f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70ee14f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70ee14f7 Branch: refs/heads/branch-1.0 Commit: 70ee14f76d6c3d3f162db6bbe12797c252a0295a Parents: effa69f Author: Ubuntu ubu...@ip-172-31-8-77.us-west-2.compute.internal Authored: Mon Jul 14 07:46:30 2014 + Committer: Ubuntu ubu...@ip-172-31-8-77.us-west-2.compute.internal Committed: Mon Jul 14 07:46:30 2014 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- pom.xml | 4 ++-- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml | 2 +- 21 files changed, 22 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index e290e79..6735379 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index c8ad40f..8a38b43 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 2302d7b..23eea6e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9156a11..d158a75 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 1cefa15..f5f0d54 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/kafka/pom.xml -- diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index cc05e69..8bc5c03 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent/artifactId -version1.0.2-SNAPSHOT/version +version1.0.2/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/70ee14f7/external/mqtt/pom.xml -- diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index
git commit: [SPARK-2443][SQL] Fix slow read from partitioned tables
Repository: spark Updated Branches: refs/heads/master 38ccd6ebd - d60b09bb6 [SPARK-2443][SQL] Fix slow read from partitioned tables This fix obtains a comparable performance boost as [PR #1390](https://github.com/apache/spark/pull/1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix. ## Benchmarks Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console. Without the fix: Type | Non-partitioned | Partitioned (1 part) | | - First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s) Stablized runs | 1.21s (1.18s) | 27.6s (27.5s) With this fix: Type | Non-partitioned | Partitioned (1 part) | | - First run | 9.57s (1.46s) | 11.0s (1.69s) Stablized runs | 1.13s (1.10s) | 1.23s (1.19s) Author: Zongheng Yang zonghen...@gmail.com Closes #1408 from concretevitamin/slow-read-2 and squashes the following commits: d86e437 [Zongheng Yang] Move update initialization out of potentially long loop. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60b09bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60b09bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60b09bb Branch: refs/heads/master Commit: d60b09bb60cff106fa0acddebf35714503b20f03 Parents: 38ccd6e Author: Zongheng Yang zonghen...@gmail.com Authored: Mon Jul 14 13:22:24 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jul 14 13:22:24 2014 -0700 -- .../scala/org/apache/spark/sql/hive/TableReader.scala | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d60b09bb/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8cfde46..c394257 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -164,13 +164,17 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon hivePartitionRDD.mapPartitions { iter = val hconf = broadcastedHiveConf.value.value val rowWithPartArr = new Array[Object](2) + +// The update and deserializer initialization are intentionally +// kept out of the below iter.map loop to save performance. +rowWithPartArr.update(1, partValues) +val deserializer = localDeserializer.newInstance() +deserializer.initialize(hconf, partProps) + // Map each tuple to a row object iter.map { value = - val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) val deserializedRow = deserializer.deserialize(value) rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) rowWithPartArr.asInstanceOf[Object] } }
git commit: [SPARK-2443][SQL] Fix slow read from partitioned tables
Repository: spark Updated Branches: refs/heads/branch-1.0 baf92a0f2 - 2ec7d7ab7 [SPARK-2443][SQL] Fix slow read from partitioned tables This fix obtains a comparable performance boost as [PR #1390](https://github.com/apache/spark/pull/1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix. ## Benchmarks Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console. Without the fix: Type | Non-partitioned | Partitioned (1 part) | | - First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s) Stablized runs | 1.21s (1.18s) | 27.6s (27.5s) With this fix: Type | Non-partitioned | Partitioned (1 part) | | - First run | 9.57s (1.46s) | 11.0s (1.69s) Stablized runs | 1.13s (1.10s) | 1.23s (1.19s) Author: Zongheng Yang zonghen...@gmail.com Closes #1408 from concretevitamin/slow-read-2 and squashes the following commits: d86e437 [Zongheng Yang] Move update initialization out of potentially long loop. (cherry picked from commit d60b09bb60cff106fa0acddebf35714503b20f03) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ec7d7ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ec7d7ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ec7d7ab Branch: refs/heads/branch-1.0 Commit: 2ec7d7ab751be67a86a048eed85bd9fd36dfaf83 Parents: baf92a0 Author: Zongheng Yang zonghen...@gmail.com Authored: Mon Jul 14 13:22:24 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jul 14 13:22:39 2014 -0700 -- .../scala/org/apache/spark/sql/hive/TableReader.scala | 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ec7d7ab/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8cfde46..c394257 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -164,13 +164,17 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon hivePartitionRDD.mapPartitions { iter = val hconf = broadcastedHiveConf.value.value val rowWithPartArr = new Array[Object](2) + +// The update and deserializer initialization are intentionally +// kept out of the below iter.map loop to save performance. +rowWithPartArr.update(1, partValues) +val deserializer = localDeserializer.newInstance() +deserializer.initialize(hconf, partProps) + // Map each tuple to a row object iter.map { value = - val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) val deserializedRow = deserializer.deserialize(value) rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) rowWithPartArr.asInstanceOf[Object] } }
git commit: [SPARK-1946] Submit tasks after (configured ratio) executors have been registered
Repository: spark Updated Branches: refs/heads/master d60b09bb6 - 3dd8af7a6 [SPARK-1946] Submit tasks after (configured ratio) executors have been registered Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 3 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui zhihui...@intel.com Closes #900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dd8af7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dd8af7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dd8af7a Branch: refs/heads/master Commit: 3dd8af7a6623201c28231f4b71f59ea4e9ae29bf Parents: d60b09b Author: li-zhihui zhihui...@intel.com Authored: Mon Jul 14 15:32:49 2014 -0500 Committer: Thomas Graves tgra...@apache.org Committed: Mon Jul 14 15:32:49 2014 -0500 -- .../scala/org/apache/spark/SparkContext.scala | 11 +- .../spark/scheduler/SchedulerBackend.scala | 1 + .../spark/scheduler/TaskSchedulerImpl.scala | 15 .../cluster/CoarseGrainedSchedulerBackend.scala | 29 ++ .../cluster/SparkDeploySchedulerBackend.scala | 1 + docs/configuration.md | 19 ++ .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../yarn/ApplicationMasterArguments.scala | 6 ++- .../cluster/YarnClientClusterScheduler.scala| 2 + .../cluster/YarnClientSchedulerBackend.scala| 1 + .../cluster/YarnClusterScheduler.scala | 2 + .../cluster/YarnClusterSchedulerBackend.scala | 40 .../spark/deploy/yarn/ApplicationMaster.scala | 1 + 13 files changed, 127 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8819e73..8052499 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1531,7 +1531,16 @@ object SparkContext extends Logging { throw new SparkException(YARN mode not available ?, e) } } -val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) +val backend = try { + val clazz = + Class.forName(org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend) + val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) + cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] +} catch { + case e: Exception = { +throw new SparkException(YARN mode not available ?, e) + } +} scheduler.initialize(backend) scheduler http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala -- diff --git
Subscribe
git commit: [SPARK-2446][SQL] Add BinaryType support to Parquet I/O.
Repository: spark Updated Branches: refs/heads/master 3dd8af7a6 - 9fe693b5b [SPARK-2446][SQL] Add BinaryType support to Parquet I/O. Note that this commit changes the semantics when loading in data that was created with prior versions of Spark SQL. Before, we were writing out strings as Binary data without adding any other annotations. Thus, when data is read in from prior versions, data that was StringType will now become BinaryType. Users that need strings can CAST that column to a String. It was decided that while this breaks compatibility, it does make us compatible with other systems (Hive, Thrift, etc) and adds support for Binary data, so this is the right decision long term. To support `BinaryType`, the following changes are needed: - Make `StringType` use `OriginalType.UTF8` - Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType` Author: Takuya UESHIN ues...@happy-camper.st Closes #1373 from ueshin/issues/SPARK-2446 and squashes the following commits: ecacb92 [Takuya UESHIN] Add BinaryType support to Parquet I/O. 616e04a [Takuya UESHIN] Make StringType use OriginalType.UTF8. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fe693b5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fe693b5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fe693b5 Branch: refs/heads/master Commit: 9fe693b5b6ed6af34ee1e800ab89c8a11991ea38 Parents: 3dd8af7 Author: Takuya UESHIN ues...@happy-camper.st Authored: Mon Jul 14 15:42:28 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jul 14 15:42:35 2014 -0700 -- .../spark/sql/parquet/ParquetConverter.scala| 2 +- .../spark/sql/parquet/ParquetTableSupport.scala | 4 ++ .../spark/sql/parquet/ParquetTestData.scala | 18 +++--- .../apache/spark/sql/parquet/ParquetTypes.scala | 62 ++-- .../spark/sql/parquet/ParquetQuerySuite.scala | 16 +++-- 5 files changed, 57 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 75748b2..de8fe2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -114,7 +114,7 @@ private[sql] object CatalystConverter { } } // All other primitive types use the default converter - case ctype: NativeType = { // note: need the type tag here! + case ctype: PrimitiveType = { // note: need the type tag here! new CatalystPrimitiveConverter(parent, fieldIndex) } case _ = throw new RuntimeException( http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 108f8b6..f1953a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -191,6 +191,8 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { value.asInstanceOf[String].getBytes(utf-8) ) ) +case BinaryType = writer.addBinary( + Binary.fromByteArray(value.asInstanceOf[Array[Byte]])) case IntegerType = writer.addInteger(value.asInstanceOf[Int]) case ShortType = writer.addInteger(value.asInstanceOf[Short]) case LongType = writer.addLong(value.asInstanceOf[Long]) @@ -299,6 +301,8 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { record(index).asInstanceOf[String].getBytes(utf-8) ) ) + case BinaryType = writer.addBinary( +Binary.fromByteArray(record(index).asInstanceOf[Array[Byte]])) case IntegerType = writer.addInteger(record.getInt(index)) case ShortType = writer.addInteger(record.getShort(index)) case LongType = writer.addLong(record.getLong(index)) http://git-wip-us.apache.org/repos/asf/spark/blob/9fe693b5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala -- diff --git