spark git commit: [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists
Repository: spark Updated Branches: refs/heads/branch-1.5 94692bb14 - 01efa4f27 [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR. **Committer who merges this PR should attribute it to Damian Guy damian.guygmail.com.** SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`: This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field. One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays. This PR fixes this issue by 1. Handling unannotated repeated fields in `CatalystSchemaConverter`. 2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`. Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`. Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream. Author: Cheng Lian l...@databricks.com Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits: ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite f1c7bfd [Cheng Lian] Updates .rat-excludes 420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists (cherry picked from commit 071bbad5db1096a548c886762b611a8484a52753) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01efa4f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01efa4f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01efa4f2 Branch: refs/heads/branch-1.5 Commit: 01efa4f27db1eefba9cb0fe2dec790556f3280de Parents: 94692bb Author: Damian Guy damian@gmail.com Authored: Tue Aug 11 12:46:33 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Tue Aug 11 12:46:54 2015 +0800 -- .rat-excludes | 1 + .../parquet/CatalystRowConverter.scala | 151 +++ .../parquet/CatalystSchemaConverter.scala | 7 +- .../test/resources/nested-array-struct.parquet | Bin 0 - 775 bytes .../src/test/resources/old-repeated-int.parquet | Bin 0 - 389 bytes .../test/resources/old-repeated-message.parquet | Bin 0 - 600 bytes .../src/test/resources/old-repeated.parquet | Bin 0 - 432 bytes .../parquet-thrift-compat.snappy.parquet| Bin .../resources/proto-repeated-string.parquet | Bin 0 - 411 bytes .../resources/proto-repeated-struct.parquet | Bin 0 - 608 bytes .../proto-struct-with-array-many.parquet| Bin 0 - 802 bytes .../resources/proto-struct-with-array.parquet | Bin 0 - 1576 bytes .../ParquetProtobufCompatibilitySuite.scala | 91 +++ .../parquet/ParquetSchemaSuite.scala| 30 14 files changed, 247 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 7277146..9165872 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -94,3 +94,4 @@ INDEX gen-java.* .*avpr org.apache.spark.sql.sources.DataSourceRegister +.*parquet http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 3542dfb..ab5a6dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder import scala.collection.JavaConversions._ -import
spark git commit: [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists
Repository: spark Updated Branches: refs/heads/master 3c9802d94 - 071bbad5d [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR. **Committer who merges this PR should attribute it to Damian Guy damian.guygmail.com.** SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`: This does not affect repeated fields that are not annotated: A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated by `LIST` or `MAP` should be interpreted as a required list of required elements where the element type is the type of the field. One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays. This PR fixes this issue by 1. Handling unannotated repeated fields in `CatalystSchemaConverter`. 2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`. Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`. Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream. Author: Cheng Lian l...@databricks.com Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits: ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite f1c7bfd [Cheng Lian] Updates .rat-excludes 420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/071bbad5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/071bbad5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/071bbad5 Branch: refs/heads/master Commit: 071bbad5db1096a548c886762b611a8484a52753 Parents: 3c9802d Author: Damian Guy damian@gmail.com Authored: Tue Aug 11 12:46:33 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Tue Aug 11 12:46:33 2015 +0800 -- .rat-excludes | 1 + .../parquet/CatalystRowConverter.scala | 151 +++ .../parquet/CatalystSchemaConverter.scala | 7 +- .../test/resources/nested-array-struct.parquet | Bin 0 - 775 bytes .../src/test/resources/old-repeated-int.parquet | Bin 0 - 389 bytes .../test/resources/old-repeated-message.parquet | Bin 0 - 600 bytes .../src/test/resources/old-repeated.parquet | Bin 0 - 432 bytes .../parquet-thrift-compat.snappy.parquet| Bin .../resources/proto-repeated-string.parquet | Bin 0 - 411 bytes .../resources/proto-repeated-struct.parquet | Bin 0 - 608 bytes .../proto-struct-with-array-many.parquet| Bin 0 - 802 bytes .../resources/proto-struct-with-array.parquet | Bin 0 - 1576 bytes .../ParquetProtobufCompatibilitySuite.scala | 91 +++ .../parquet/ParquetSchemaSuite.scala| 30 14 files changed, 247 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 7277146..9165872 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -94,3 +94,4 @@ INDEX gen-java.* .*avpr org.apache.spark.sql.sources.DataSourceRegister +.*parquet http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 3542dfb..ab5a6dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder import scala.collection.JavaConversions._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import
spark git commit: [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join
Repository: spark Updated Branches: refs/heads/master 071bbad5d - 91e9389f3 [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, #5717. Closes #5717. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/7904) !-- Reviewable:end -- Author: Josh Rosen joshro...@databricks.com Author: Daoyuan Wang daoyuan.w...@intel.com Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91e9389f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91e9389f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91e9389f Branch: refs/heads/master Commit: 91e9389f39509e63654bd4bcb7bd919eaedda910 Parents: 071bbad Author: Josh Rosen joshro...@databricks.com Authored: Mon Aug 10 22:04:41 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 22:04:41 2015 -0700 -- .../sql/catalyst/expressions/JoinedRow.scala| 6 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/RowIterator.scala | 93 ++ .../spark/sql/execution/SparkStrategies.scala | 45 ++- .../joins/BroadcastNestedLoopJoin.scala | 5 +- .../sql/execution/joins/SortMergeJoin.scala | 331 +-- .../execution/joins/SortMergeOuterJoin.scala| 251 ++ .../scala/org/apache/spark/sql/JoinSuite.scala | 132 .../sql/execution/joins/InnerJoinSuite.scala| 180 ++ .../sql/execution/joins/OuterJoinSuite.scala| 310 - .../sql/execution/joins/SemiJoinSuite.scala | 125 --- .../apache/spark/sql/test/SQLTestUtils.scala| 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- 13 files changed, 1165 insertions(+), 319 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/91e9389f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala index b76757c..d3560df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala @@ -37,20 +37,20 @@ class JoinedRow extends InternalRow { } /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { + def apply(r1: InternalRow, r2: InternalRow): JoinedRow = { row1 = r1 row2 = r2 this } /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { + def withLeft(newLeft: InternalRow): JoinedRow = { row1 = newLeft this } /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { + def withRight(newRight: InternalRow): JoinedRow = { row2 = newRight this } http://git-wip-us.apache.org/repos/asf/spark/blob/91e9389f/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f73bb04..4bf00b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -873,7 +873,7 @@ class SQLContext(@transient val sparkContext: SparkContext) HashAggregation :: Aggregation :: LeftSemiJoin :: - HashJoin :: + EquiJoinSelection :: InMemoryScans :: BasicOperators :: CartesianProduct ::
spark git commit: [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join
Repository: spark Updated Branches: refs/heads/branch-1.5 01efa4f27 - f9beef998 [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join This patch adds a new `SortMergeOuterJoin` operator that performs left and right outer joins using sort merge join. It also refactors `SortMergeJoin` in order to improve performance and code clarity. Along the way, I also performed a couple pieces of minor cleanup and optimization: - Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since it's also used for non-hash joins. - Rewrite the comment at the top of `HashJoin` to better explain the precedence for choosing join operators. - Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf settings. This patch incorporates several ideas from adrian-wang's patch, #5717. Closes #5717. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/7904) !-- Reviewable:end -- Author: Josh Rosen joshro...@databricks.com Author: Daoyuan Wang daoyuan.w...@intel.com Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits. (cherry picked from commit 91e9389f39509e63654bd4bcb7bd919eaedda910) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9beef99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9beef99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9beef99 Branch: refs/heads/branch-1.5 Commit: f9beef9987c6a1993990e6695fb295019e5ed5d3 Parents: 01efa4f Author: Josh Rosen joshro...@databricks.com Authored: Mon Aug 10 22:04:41 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 22:04:50 2015 -0700 -- .../sql/catalyst/expressions/JoinedRow.scala| 6 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/RowIterator.scala | 93 ++ .../spark/sql/execution/SparkStrategies.scala | 45 ++- .../joins/BroadcastNestedLoopJoin.scala | 5 +- .../sql/execution/joins/SortMergeJoin.scala | 331 +-- .../execution/joins/SortMergeOuterJoin.scala| 251 ++ .../scala/org/apache/spark/sql/JoinSuite.scala | 132 .../sql/execution/joins/InnerJoinSuite.scala| 180 ++ .../sql/execution/joins/OuterJoinSuite.scala| 310 - .../sql/execution/joins/SemiJoinSuite.scala | 125 --- .../apache/spark/sql/test/SQLTestUtils.scala| 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 2 +- 13 files changed, 1165 insertions(+), 319 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f9beef99/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala index b76757c..d3560df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala @@ -37,20 +37,20 @@ class JoinedRow extends InternalRow { } /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { + def apply(r1: InternalRow, r2: InternalRow): JoinedRow = { row1 = r1 row2 = r2 this } /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { + def withLeft(newLeft: InternalRow): JoinedRow = { row1 = newLeft this } /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { + def withRight(newRight: InternalRow): JoinedRow = { row2 = newRight this } http://git-wip-us.apache.org/repos/asf/spark/blob/f9beef99/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index f73bb04..4bf00b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -873,7 +873,7 @@ class SQLContext(@transient val sparkContext: SparkContext) HashAggregation :: Aggregation :: LeftSemiJoin :: - HashJoin :: + EquiJoinSelection :: InMemoryScans ::
spark git commit: [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods
Repository: spark Updated Branches: refs/heads/branch-1.5 94b2f5b32 - 3ee2c8d16 [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods Adds method documentations back to `MultivariateOnlineSummarizer`, which were present in 1.4 but disappeared somewhere along the way to 1.5. jkbradley Author: Feynman Liang fli...@databricks.com Closes #8045 from feynmanliang/SPARK-9755 and squashes the following commits: af67fde [Feynman Liang] Add MultivariateOnlineSummarizer docs (cherry picked from commit 00b655cced637e1c3b750c19266086b9dcd7c158) Signed-off-by: Joseph K. Bradley jos...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ee2c8d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ee2c8d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ee2c8d1 Branch: refs/heads/branch-1.5 Commit: 3ee2c8d169e48e0bca3fab702466e7a855f57f8e Parents: 94b2f5b Author: Feynman Liang fli...@databricks.com Authored: Mon Aug 10 11:01:45 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon Aug 10 11:01:55 2015 -0700 -- .../mllib/stat/MultivariateOnlineSummarizer.scala | 16 1 file changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ee2c8d1/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 62da9f2..64e4be0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -153,6 +153,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample mean of each dimension. + * * @since 1.1.0 */ override def mean: Vector = { @@ -168,6 +170,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample variance of each dimension. + * * @since 1.1.0 */ override def variance: Vector = { @@ -193,11 +197,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample size. + * * @since 1.1.0 */ override def count: Long = totalCnt /** + * Number of nonzero elements in each dimension. + * * @since 1.1.0 */ override def numNonzeros: Vector = { @@ -207,6 +215,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Maximum value of each dimension. + * * @since 1.1.0 */ override def max: Vector = { @@ -221,6 +231,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Minimum value of each dimension. + * * @since 1.1.0 */ override def min: Vector = { @@ -235,6 +247,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * L2 (Euclidian) norm of each dimension. + * * @since 1.2.0 */ override def normL2: Vector = { @@ -252,6 +266,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * L1 norm of each dimension. + * * @since 1.2.0 */ override def normL1: Vector = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods
Repository: spark Updated Branches: refs/heads/master 0f3366a4c - 00b655cce [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods Adds method documentations back to `MultivariateOnlineSummarizer`, which were present in 1.4 but disappeared somewhere along the way to 1.5. jkbradley Author: Feynman Liang fli...@databricks.com Closes #8045 from feynmanliang/SPARK-9755 and squashes the following commits: af67fde [Feynman Liang] Add MultivariateOnlineSummarizer docs Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00b655cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00b655cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00b655cc Branch: refs/heads/master Commit: 00b655cced637e1c3b750c19266086b9dcd7c158 Parents: 0f3366a Author: Feynman Liang fli...@databricks.com Authored: Mon Aug 10 11:01:45 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon Aug 10 11:01:45 2015 -0700 -- .../mllib/stat/MultivariateOnlineSummarizer.scala | 16 1 file changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00b655cc/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 62da9f2..64e4be0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -153,6 +153,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample mean of each dimension. + * * @since 1.1.0 */ override def mean: Vector = { @@ -168,6 +170,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample variance of each dimension. + * * @since 1.1.0 */ override def variance: Vector = { @@ -193,11 +197,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Sample size. + * * @since 1.1.0 */ override def count: Long = totalCnt /** + * Number of nonzero elements in each dimension. + * * @since 1.1.0 */ override def numNonzeros: Vector = { @@ -207,6 +215,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Maximum value of each dimension. + * * @since 1.1.0 */ override def max: Vector = { @@ -221,6 +231,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * Minimum value of each dimension. + * * @since 1.1.0 */ override def min: Vector = { @@ -235,6 +247,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * L2 (Euclidian) norm of each dimension. + * * @since 1.2.0 */ override def normL2: Vector = { @@ -252,6 +266,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S } /** + * L1 norm of each dimension. + * * @since 1.2.0 */ override def normL1: Vector = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9710] [TEST] Fix RPackageUtilsSuite when R is not available.
Repository: spark Updated Branches: refs/heads/master e3fef0f9e - 0f3366a4c [SPARK-9710] [TEST] Fix RPackageUtilsSuite when R is not available. RUtils.isRInstalled throws an exception if R is not installed, instead of returning false. Fix that. Author: Marcelo Vanzin van...@cloudera.com Closes #8008 from vanzin/SPARK-9710 and squashes the following commits: df72d8c [Marcelo Vanzin] [SPARK-9710] [test] Fix RPackageUtilsSuite when R is not available. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f3366a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f3366a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f3366a4 Branch: refs/heads/master Commit: 0f3366a4c740147a7a7519922642912e2dd238f8 Parents: e3fef0f Author: Marcelo Vanzin van...@cloudera.com Authored: Mon Aug 10 10:10:40 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Mon Aug 10 10:10:40 2015 -0700 -- core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f3366a4/core/src/main/scala/org/apache/spark/api/r/RUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 93b3bea..427b2bc 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -67,7 +67,11 @@ private[spark] object RUtils { /** Check if R is installed before running tests that use R commands. */ def isRInstalled: Boolean = { -val builder = new ProcessBuilder(Seq(R, --version)) -builder.start().waitFor() == 0 +try { + val builder = new ProcessBuilder(Seq(R, --version)) + builder.start().waitFor() == 0 +} catch { + case e: Exception = false +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9743] [SQL] Fixes JSONRelation refreshing
Repository: spark Updated Branches: refs/heads/branch-1.5 f75c64b0c - 94b2f5b32 [SPARK-9743] [SQL] Fixes JSONRelation refreshing PR #7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] [2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to the path of a JSON table` pass. However, this forces every `HadoopFsRelation` table scan to do a refresh, which can be super expensive for tables with large number of partitions. The reason why the original test case fails without the `refresh()` calls is that, the old JSON relation builds the base RDD with the input paths, while `HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON relation, we can create a temporary table based on a path, writing data to that, and then read newly written data without refreshing the table. This is no long true for `HadoopFsRelation`. This PR removes those two expensive refresh calls, and moves the refresh into `JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` interface to provide better support for this use case. [1]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L63 [2]: https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L91 Author: Cheng Lian l...@databricks.com Closes #8035 from liancheng/spark-9743/fix-json-relation-refreshing and squashes the following commits: ec1957d [Cheng Lian] Fixes JSONRelation refreshing (cherry picked from commit e3fef0f9e17b1766a3869cb80ce7e4cd521cb7b6) Signed-off-by: Yin Huai yh...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94b2f5b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94b2f5b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94b2f5b3 Branch: refs/heads/branch-1.5 Commit: 94b2f5b3213553278ead376c24e63f019a18e793 Parents: f75c64b Author: Cheng Lian l...@databricks.com Authored: Mon Aug 10 09:07:08 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Mon Aug 10 09:07:20 2015 -0700 -- .../datasources/DataSourceStrategy.scala | 2 -- .../org/apache/spark/sql/json/JSONRelation.scala | 19 +++ .../apache/spark/sql/sources/interfaces.scala| 2 +- .../apache/spark/sql/sources/InsertSuite.scala | 10 +- 4 files changed, 21 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5b5fa8c..78a4acd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty = - t.refresh() val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray logInfo { @@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) = - t.refresh() // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index b34a272..5bb9e62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -22,20 +22,22 @@ import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{Text,
spark git commit: Fixed AtmoicReference Example
Repository: spark Updated Branches: refs/heads/branch-1.5 3ee2c8d16 - 39493b235 Fixed AtmoicReference Example Author: Mahmoud Lababidi labab...@gmail.com Closes #8076 from lababidi/master and squashes the following commits: af4553b [Mahmoud Lababidi] Fixed AtmoicReference Example (cherry picked from commit d285212756168200383bf4df2c951bd80a492a7c) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39493b23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39493b23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39493b23 Branch: refs/heads/branch-1.5 Commit: 39493b235ddaa2123e14629935712c43e4a88c87 Parents: 3ee2c8d Author: Mahmoud Lababidi labab...@gmail.com Authored: Mon Aug 10 13:02:01 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:02:12 2015 -0700 -- docs/streaming-kafka-integration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39493b23/docs/streaming-kafka-integration.md -- diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index 775d508..7571e22 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -152,7 +152,7 @@ Next, we discuss how to use this approach in your streaming application. /div div data-lang=java markdown=1 // Hold a reference to the current offset ranges, so it can be used downstream - final AtomicReferenceOffsetRange[] offsetRanges = new AtomicReference(); + final AtomicReferenceOffsetRange[] offsetRanges = new AtomicReference(); directKafkaStream.transformToPair( new FunctionJavaPairRDDString, String, JavaPairRDDString, String() { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.5.0-snapshot-20150810
Repository: spark Updated Branches: refs/heads/branch-1.5 d17303a94 - e51779c17 Preparing Spark release v1.5.0-snapshot-20150810 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22031493 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22031493 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22031493 Branch: refs/heads/branch-1.5 Commit: 220314935e0aa3c910ac5e22a38e2b6882a5d1e8 Parents: d17303a Author: Patrick Wendell pwend...@gmail.com Authored: Mon Aug 10 13:56:50 2015 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Aug 10 13:56:50 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index e9c6d26..3ef7d6f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index ed5c37e..684e07b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 0e53a79..bb25652 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index e6884b0..9ef1eda 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 1318959..6377c3e 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b
spark git commit: [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled
Repository: spark Updated Branches: refs/heads/branch-1.5 39493b235 - d251d9ff0 [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled Exchange.isUnsafe should check whether codegen and unsafe are enabled. Author: Josh Rosen joshro...@databricks.com Closes #8073 from JoshRosen/SPARK-9784 and squashes the following commits: 7a1019f [Josh Rosen] [SPARK-9784] Exchange.isUnsafe should check whether codegen and unsafe are enabled (cherry picked from commit 0fe66744f16854fc8cd8a72174de93a788e3cf6c) Signed-off-by: Josh Rosen joshro...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d251d9ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d251d9ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d251d9ff Branch: refs/heads/branch-1.5 Commit: d251d9ff000363665aa6faeb2199a48dc5970ca2 Parents: 39493b2 Author: Josh Rosen joshro...@databricks.com Authored: Mon Aug 10 13:05:03 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Aug 10 13:05:24 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d251d9ff/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index b89e634..029f226 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,7 +46,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una * Returns true iff we can support the data type, and we are not doing range partitioning. */ private lazy val tungstenMode: Boolean = { -GenerateUnsafeProjection.canSupport(child.schema) +unsafeEnabled codegenEnabled GenerateUnsafeProjection.canSupport(child.schema) !newPartitioning.isInstanceOf[RangePartitioning] } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled
Repository: spark Updated Branches: refs/heads/master d28521275 - 0fe66744f [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled Exchange.isUnsafe should check whether codegen and unsafe are enabled. Author: Josh Rosen joshro...@databricks.com Closes #8073 from JoshRosen/SPARK-9784 and squashes the following commits: 7a1019f [Josh Rosen] [SPARK-9784] Exchange.isUnsafe should check whether codegen and unsafe are enabled Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fe66744 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fe66744 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fe66744 Branch: refs/heads/master Commit: 0fe66744f16854fc8cd8a72174de93a788e3cf6c Parents: d285212 Author: Josh Rosen joshro...@databricks.com Authored: Mon Aug 10 13:05:03 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Aug 10 13:05:03 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0fe66744/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index b89e634..029f226 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,7 +46,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una * Returns true iff we can support the data type, and we are not doing range partitioning. */ private lazy val tungstenMode: Boolean = { -GenerateUnsafeProjection.canSupport(child.schema) +unsafeEnabled codegenEnabled GenerateUnsafeProjection.canSupport(child.schema) !newPartitioning.isInstanceOf[RangePartitioning] } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update
Repository: spark Updated Branches: refs/heads/branch-1.2 1c38d4254 - e23843c06 [SPARK-9633] [BUILD] SBT download locations outdated; need an update Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Follow up on https://github.com/apache/spark/pull/7792 Author: Sean Owen so...@cloudera.com Closes #7956 from srowen/SPARK-9633 and squashes the following commits: caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Conflicts: sbt/sbt-launch-lib.bash Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e23843c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e23843c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e23843c0 Branch: refs/heads/branch-1.2 Commit: e23843c0603bb7d278ba21abde0dd92661b2fac8 Parents: 1c38d42 Author: Sean Owen so...@cloudera.com Authored: Thu Aug 6 23:43:52 2015 +0100 Committer: Davies Liu davies@gmail.com Committed: Mon Aug 10 13:10:40 2015 -0700 -- sbt/sbt-launch-lib.bash | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e23843c0/sbt/sbt-launch-lib.bash -- diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index 055e206..eb6b415 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -38,8 +38,7 @@ dlog () { acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' ./project/build.properties` - URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar - URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar + URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=sbt/sbt-launch-${SBT_VERSION}.jar sbt_jar=$JAR @@ -51,9 +50,9 @@ acquire_sbt_jar () { printf Attempting to fetch sbt\n JAR_DL=${JAR}.part if hash curl 2/dev/null; then - (curl --silent ${URL1} ${JAR_DL} || curl --silent ${URL2} ${JAR_DL}) mv ${JAR_DL} ${JAR} + curl --fail --location --silent ${URL1} ${JAR_DL} mv ${JAR_DL} ${JAR} elif hash wget 2/dev/null; then - (wget --quiet ${URL1} -O ${JAR_DL} || wget --quiet ${URL2} -O ${JAR_DL}) mv ${JAR_DL} ${JAR} + wget --quiet ${URL1} -O ${JAR_DL} mv ${JAR_DL} ${JAR} else printf You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n; exit -1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions
Repository: spark Updated Branches: refs/heads/branch-1.5 c1838e430 - 23842482f [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions Currently, generated UnsafeProjection can reach 64k byte code limit of Java. This patch will split the generated expressions into multiple functions, to avoid the limitation. After this patch, we can work well with table that have up to 64k columns (hit max number of constants limit in Java), it should be enough in practice. cc rxin Author: Davies Liu dav...@databricks.com Closes #8044 from davies/wider_table and squashes the following commits: 9192e6c [Davies Liu] fix generated safe projection d1ef81a [Davies Liu] fix failed tests 737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table ffcd132 [Davies Liu] address comments 1b95be4 [Davies Liu] put the generated class into sql package 77ed72d [Davies Liu] address comments 4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 495e932 [Davies Liu] support wider table with more than 1k columns for generated projections (cherry picked from commit fe2fb7fb7189d183a4273ad27514af4b6b461f26) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23842482 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23842482 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23842482 Branch: refs/heads/branch-1.5 Commit: 23842482f46b45d90ba32ce406675cdb5f88c537 Parents: c1838e4 Author: Davies Liu dav...@databricks.com Authored: Mon Aug 10 13:52:18 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:52:27 2015 -0700 -- .../expressions/codegen/CodeGenerator.scala | 48 +++- .../codegen/GenerateMutableProjection.scala | 43 +-- .../codegen/GenerateSafeProjection.scala| 52 ++-- .../codegen/GenerateUnsafeProjection.scala | 122 ++- .../codegen/GenerateUnsafeRowJoiner.scala | 2 +- .../codegen/GeneratedProjectionSuite.scala | 82 + 6 files changed, 207 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23842482/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7b41c9a..c21f4d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.language.existentials import com.google.common.cache.{CacheBuilder, CacheLoader} @@ -265,6 +266,45 @@ class CodeGenContext { def isPrimitiveType(jt: String): Boolean = primitiveTypes.contains(jt) def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt)) + + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM + * + * @param row the variable name of row that is used by expressions + */ + def splitExpressions(row: String, expressions: Seq[String]): String = { +val blocks = new ArrayBuffer[String]() +val blockBuilder = new StringBuilder() +for (code - expressions) { + // We can't know how many byte code will be generated, so use the number of bytes as limit + if (blockBuilder.length 64 * 1000) { +blocks.append(blockBuilder.toString()) +blockBuilder.clear() + } + blockBuilder.append(code) +} +blocks.append(blockBuilder.toString()) + +if (blocks.length == 1) { + // inline execution if only one block + blocks.head +} else { + val apply = freshName(apply) + val functions = blocks.zipWithIndex.map { case (body, i) = +val name = s${apply}_$i +val code = s + |private void $name(InternalRow $row) { + | $body + |} + .stripMargin + addNewFunction(name, code) + name + } + + functions.map(name = s$name($row);).mkString(\n) +} + } } /** @@ -289,15 +329,15 @@ abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Loggin protected def declareMutableStates(ctx:
spark git commit: [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions
Repository: spark Updated Branches: refs/heads/master 40ed2af58 - fe2fb7fb7 [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions Currently, generated UnsafeProjection can reach 64k byte code limit of Java. This patch will split the generated expressions into multiple functions, to avoid the limitation. After this patch, we can work well with table that have up to 64k columns (hit max number of constants limit in Java), it should be enough in practice. cc rxin Author: Davies Liu dav...@databricks.com Closes #8044 from davies/wider_table and squashes the following commits: 9192e6c [Davies Liu] fix generated safe projection d1ef81a [Davies Liu] fix failed tests 737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table ffcd132 [Davies Liu] address comments 1b95be4 [Davies Liu] put the generated class into sql package 77ed72d [Davies Liu] address comments 4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 495e932 [Davies Liu] support wider table with more than 1k columns for generated projections Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe2fb7fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe2fb7fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe2fb7fb Branch: refs/heads/master Commit: fe2fb7fb7189d183a4273ad27514af4b6b461f26 Parents: 40ed2af Author: Davies Liu dav...@databricks.com Authored: Mon Aug 10 13:52:18 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:52:18 2015 -0700 -- .../expressions/codegen/CodeGenerator.scala | 48 +++- .../codegen/GenerateMutableProjection.scala | 43 +-- .../codegen/GenerateSafeProjection.scala| 52 ++-- .../codegen/GenerateUnsafeProjection.scala | 122 ++- .../codegen/GenerateUnsafeRowJoiner.scala | 2 +- .../codegen/GeneratedProjectionSuite.scala | 82 + 6 files changed, 207 insertions(+), 142 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe2fb7fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7b41c9a..c21f4d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.language.existentials import com.google.common.cache.{CacheBuilder, CacheLoader} @@ -265,6 +266,45 @@ class CodeGenContext { def isPrimitiveType(jt: String): Boolean = primitiveTypes.contains(jt) def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt)) + + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM + * + * @param row the variable name of row that is used by expressions + */ + def splitExpressions(row: String, expressions: Seq[String]): String = { +val blocks = new ArrayBuffer[String]() +val blockBuilder = new StringBuilder() +for (code - expressions) { + // We can't know how many byte code will be generated, so use the number of bytes as limit + if (blockBuilder.length 64 * 1000) { +blocks.append(blockBuilder.toString()) +blockBuilder.clear() + } + blockBuilder.append(code) +} +blocks.append(blockBuilder.toString()) + +if (blocks.length == 1) { + // inline execution if only one block + blocks.head +} else { + val apply = freshName(apply) + val functions = blocks.zipWithIndex.map { case (body, i) = +val name = s${apply}_$i +val code = s + |private void $name(InternalRow $row) { + | $body + |} + .stripMargin + addNewFunction(name, code) + name + } + + functions.map(name = s$name($row);).mkString(\n) +} + } } /** @@ -289,15 +329,15 @@ abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Loggin protected def declareMutableStates(ctx: CodeGenContext): String = { ctx.mutableStates.map { case (javaType, variableName, _) = sprivate $javaType $variableName; -
spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update
Repository: spark Updated Branches: refs/heads/branch-1.0 d304ef4a9 - 117843f85 [SPARK-9633] [BUILD] SBT download locations outdated; need an update Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Follow up on https://github.com/apache/spark/pull/7792 Author: Sean Owen so...@cloudera.com Closes #7956 from srowen/SPARK-9633 and squashes the following commits: caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Conflicts: sbt/sbt-launch-lib.bash Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/117843f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/117843f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/117843f8 Branch: refs/heads/branch-1.0 Commit: 117843f85e8e69a43ffa531716a37c8c05dbabb0 Parents: d304ef4 Author: Sean Owen so...@cloudera.com Authored: Thu Aug 6 23:43:52 2015 +0100 Committer: Davies Liu davies@gmail.com Committed: Mon Aug 10 13:15:41 2015 -0700 -- sbt/sbt-launch-lib.bash | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/117843f8/sbt/sbt-launch-lib.bash -- diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index 64e40a8..2fbb060 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -37,8 +37,7 @@ dlog () { acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' ./project/build.properties` - URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar - URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar + URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=sbt/sbt-launch-${SBT_VERSION}.jar sbt_jar=$JAR @@ -50,9 +49,9 @@ acquire_sbt_jar () { printf Attempting to fetch sbt\n JAR_DL=${JAR}.part if hash curl 2/dev/null; then - (curl --progress-bar ${URL1} ${JAR_DL} || curl --progress-bar ${URL2} ${JAR_DL}) mv ${JAR_DL} ${JAR} + curl --progress-bar --fail --location ${URL1} ${JAR_DL} mv ${JAR_DL} ${JAR} elif hash wget 2/dev/null; then - (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) mv ${JAR_DL} ${JAR} + wget --progress=bar ${URL1} -O ${JAR_DL} mv ${JAR_DL} ${JAR} else printf You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n; exit -1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)
Repository: spark Updated Branches: refs/heads/master fe2fb7fb7 - c4fd2a242 [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType) This patch optimize two things: 1. passing MathContext to JavaBigDecimal.multiply/divide/reminder to do right rounding, because java.math.BigDecimal.apply(MathContext) is expensive 2. Cast integer/short/byte to decimal directly (without double) This two optimizations could speed up the end-to-end time of a aggregation (SUM(short * decimal(5, 2)) 75% (from 19s - 10.8s) Author: Davies Liu dav...@databricks.com Closes #8052 from davies/optimize_decimal and squashes the following commits: 225efad [Davies Liu] improve decimal.times() and cast(int, decimalType) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4fd2a24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4fd2a24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4fd2a24 Branch: refs/heads/master Commit: c4fd2a242228ee101904770446e3f37d49e39b76 Parents: fe2fb7f Author: Davies Liu dav...@databricks.com Authored: Mon Aug 10 13:55:11 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:55:11 2015 -0700 -- .../spark/sql/catalyst/expressions/Cast.scala | 42 +++- .../org/apache/spark/sql/types/Decimal.scala| 12 +++--- 2 files changed, 22 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4fd2a24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 946c5a9..616b9e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -155,7 +155,7 @@ case class Cast(child: Expression, dataType: DataType) case ByteType = buildCast[Byte](_, _ != 0) case DecimalType() = - buildCast[Decimal](_, _ != Decimal.ZERO) + buildCast[Decimal](_, !_.isZero) case DoubleType = buildCast[Double](_, _ != 0) case FloatType = @@ -315,13 +315,13 @@ case class Cast(child: Expression, dataType: DataType) case TimestampType = // Note that we lose precision here. buildCast[Long](_, t = changePrecision(Decimal(timestampToDouble(t)), target)) -case DecimalType() = +case dt: DecimalType = b = changePrecision(b.asInstanceOf[Decimal].clone(), target) -case LongType = - b = changePrecision(Decimal(b.asInstanceOf[Long]), target) -case x: NumericType = // All other numeric types can be represented precisely as Doubles +case t: IntegralType = + b = changePrecision(Decimal(t.integral.asInstanceOf[Integral[Any]].toLong(b)), target) +case x: FractionalType = b = try { - changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), target) + changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)), target) } catch { case _: NumberFormatException = null } @@ -534,10 +534,7 @@ case class Cast(child: Expression, dataType: DataType) (c, evPrim, evNull) = s try { - org.apache.spark.sql.types.Decimal tmpDecimal = -new org.apache.spark.sql.types.Decimal().set( - new scala.math.BigDecimal( -new java.math.BigDecimal($c.toString(; + Decimal tmpDecimal = Decimal.apply(new java.math.BigDecimal($c.toString())); ${changePrecision(tmpDecimal, target, evPrim, evNull)} } catch (java.lang.NumberFormatException e) { $evNull = true; @@ -546,12 +543,7 @@ case class Cast(child: Expression, dataType: DataType) case BooleanType = (c, evPrim, evNull) = s -org.apache.spark.sql.types.Decimal tmpDecimal = null; -if ($c) { - tmpDecimal = new org.apache.spark.sql.types.Decimal().set(1); -} else { - tmpDecimal = new org.apache.spark.sql.types.Decimal().set(0); -} +Decimal tmpDecimal = $c ? Decimal.apply(1) : Decimal.apply(0); ${changePrecision(tmpDecimal, target, evPrim, evNull)} case DateType = @@ -561,32 +553,28 @@ case class Cast(child: Expression, dataType: DataType) // Note that we lose precision here. (c, evPrim, evNull) = s -org.apache.spark.sql.types.Decimal tmpDecimal = - new org.apache.spark.sql.types.Decimal().set( -
spark git commit: [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)
Repository: spark Updated Branches: refs/heads/branch-1.5 23842482f - d17303a94 [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType) This patch optimize two things: 1. passing MathContext to JavaBigDecimal.multiply/divide/reminder to do right rounding, because java.math.BigDecimal.apply(MathContext) is expensive 2. Cast integer/short/byte to decimal directly (without double) This two optimizations could speed up the end-to-end time of a aggregation (SUM(short * decimal(5, 2)) 75% (from 19s - 10.8s) Author: Davies Liu dav...@databricks.com Closes #8052 from davies/optimize_decimal and squashes the following commits: 225efad [Davies Liu] improve decimal.times() and cast(int, decimalType) (cherry picked from commit c4fd2a242228ee101904770446e3f37d49e39b76) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d17303a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d17303a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d17303a9 Branch: refs/heads/branch-1.5 Commit: d17303a94820ded970030968006ecabe76820278 Parents: 2384248 Author: Davies Liu dav...@databricks.com Authored: Mon Aug 10 13:55:11 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:55:19 2015 -0700 -- .../spark/sql/catalyst/expressions/Cast.scala | 42 +++- .../org/apache/spark/sql/types/Decimal.scala| 12 +++--- 2 files changed, 22 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d17303a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 946c5a9..616b9e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -155,7 +155,7 @@ case class Cast(child: Expression, dataType: DataType) case ByteType = buildCast[Byte](_, _ != 0) case DecimalType() = - buildCast[Decimal](_, _ != Decimal.ZERO) + buildCast[Decimal](_, !_.isZero) case DoubleType = buildCast[Double](_, _ != 0) case FloatType = @@ -315,13 +315,13 @@ case class Cast(child: Expression, dataType: DataType) case TimestampType = // Note that we lose precision here. buildCast[Long](_, t = changePrecision(Decimal(timestampToDouble(t)), target)) -case DecimalType() = +case dt: DecimalType = b = changePrecision(b.asInstanceOf[Decimal].clone(), target) -case LongType = - b = changePrecision(Decimal(b.asInstanceOf[Long]), target) -case x: NumericType = // All other numeric types can be represented precisely as Doubles +case t: IntegralType = + b = changePrecision(Decimal(t.integral.asInstanceOf[Integral[Any]].toLong(b)), target) +case x: FractionalType = b = try { - changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), target) + changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)), target) } catch { case _: NumberFormatException = null } @@ -534,10 +534,7 @@ case class Cast(child: Expression, dataType: DataType) (c, evPrim, evNull) = s try { - org.apache.spark.sql.types.Decimal tmpDecimal = -new org.apache.spark.sql.types.Decimal().set( - new scala.math.BigDecimal( -new java.math.BigDecimal($c.toString(; + Decimal tmpDecimal = Decimal.apply(new java.math.BigDecimal($c.toString())); ${changePrecision(tmpDecimal, target, evPrim, evNull)} } catch (java.lang.NumberFormatException e) { $evNull = true; @@ -546,12 +543,7 @@ case class Cast(child: Expression, dataType: DataType) case BooleanType = (c, evPrim, evNull) = s -org.apache.spark.sql.types.Decimal tmpDecimal = null; -if ($c) { - tmpDecimal = new org.apache.spark.sql.types.Decimal().set(1); -} else { - tmpDecimal = new org.apache.spark.sql.types.Decimal().set(0); -} +Decimal tmpDecimal = $c ? Decimal.apply(1) : Decimal.apply(0); ${changePrecision(tmpDecimal, target, evPrim, evNull)} case DateType = @@ -561,32 +553,28 @@ case class Cast(child: Expression, dataType: DataType) // Note that we lose precision here. (c, evPrim, evNull) = s -
[05/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala new file mode 100644 index 000..6b62c9a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +trait TestJsonData { + + protected def ctx: SQLContext + + def primitiveFieldAndType: RDD[String] = +ctx.sparkContext.parallelize( + {string:this is a simple string., + integer:10, + long:21474836470, + bigInteger:92233720368547758070, + double:1.7976931348623157E308, + boolean:true, + null:null + } :: Nil) + + def primitiveFieldValueTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {num_num_1:11, num_num_2:null, num_num_3: 1.1, + num_bool:true, num_str:13.1, str_bool:str1} :: + {num_num_1:null, num_num_2:21474836470.9, num_num_3: null, + num_bool:12, num_str:null, str_bool:true} :: + {num_num_1:21474836470, num_num_2:92233720368547758070, num_num_3: 100, + num_bool:false, num_str:str1, str_bool:false} :: + {num_num_1:21474836570, num_num_2:1.1, num_num_3: 21474836470, + num_bool:null, num_str:92233720368547758070, str_bool:null} :: Nil) + + def jsonNullStruct: RDD[String] = +ctx.sparkContext.parallelize( + {nullstr:,ip:27.31.100.29,headers:{Host:1.abc.com,Charset:UTF-8}} :: +{nullstr:,ip:27.31.100.29,headers:{}} :: +{nullstr:,ip:27.31.100.29,headers:} :: +{nullstr:null,ip:27.31.100.29,headers:null} :: Nil) + + def complexFieldValueTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {num_struct:11, str_array:[1, 2, 3], + array:[], struct_array:[], struct: {}} :: + {num_struct:{field:false}, str_array:null, + array:null, struct_array:{}, struct: null} :: + {num_struct:null, str_array:str, + array:[4, 5, 6], struct_array:[7, 8, 9], struct: {field:null}} :: + {num_struct:{}, str_array:[str1, str2, 33], + array:[7], struct_array:{field: true}, struct: {field: str}} :: Nil) + + def arrayElementTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {array1: [1, 1.1, true, null, [], {}, [2,3,4], {field:str}], + array2: [{field:214748364700}, {field:1}]} :: + {array3: [{field:str}, {field:1}]} :: + {array3: [1, 2, 3]} :: Nil) + + def missingFields: RDD[String] = +ctx.sparkContext.parallelize( + {a:true} :: + {b:21474836470} :: + {c:[33, 44]} :: + {d:{field:true}} :: + {e:str} :: Nil) + + def complexFieldAndType1: RDD[String] = +ctx.sparkContext.parallelize( + {struct:{field1: true, field2: 92233720368547758070}, + structWithArrayFields:{field1:[4, 5, 6], field2:[str1, str2]}, + arrayOfString:[str1, str2], + arrayOfInteger:[1, 2147483647, -2147483648], + arrayOfLong:[21474836470, 9223372036854775807, -9223372036854775808], + arrayOfBigInteger:[922337203685477580700, -922337203685477580800], + arrayOfDouble:[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], + arrayOfBoolean:[true, false, true], + arrayOfNull:[null, null, null, null], + arrayOfStruct:[{field1: true, field2: str1}, {field1: false}, {field3: null}], + arrayOfArray1:[[1, 2, 3], [str1, str2]], + arrayOfArray2:[[1, 2, 3], [1.1, 2.1, 3.1]] + } :: Nil) + + def complexFieldAndType2: RDD[String] = +ctx.sparkContext.parallelize( + {arrayOfStruct:[{field1: true, field2: str1}, {field1: false}, {field3: null}], + complexArrayOfStruct: [ + { +field1: [ +
[2/2] spark git commit: Preparing development version 1.5.0-SNAPSHOT
Preparing development version 1.5.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e51779c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e51779c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e51779c1 Branch: refs/heads/branch-1.5 Commit: e51779c17877a5c715ff26d6b174c6535fb20a4c Parents: 2203149 Author: Patrick Wendell pwend...@gmail.com Authored: Mon Aug 10 13:56:56 2015 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Aug 10 13:56:56 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 3ef7d6f..e9c6d26 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 684e07b..ed5c37e 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index bb25652..0e53a79 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9ef1eda..e6884b0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 6377c3e..1318959 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 7d72f78..0664cfb 100644 ---
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-snapshot-20150810 [created] 220314935 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-snapshot-20150803 [deleted] 7e7147f3b - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 1.5.0-SNAPSHOT
Preparing development version 1.5.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e4f58ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e4f58ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e4f58ed Branch: refs/heads/branch-1.5 Commit: 0e4f58ed9b63b7eade96b8e4b2c087b182915b40 Parents: 3369ad9 Author: Patrick Wendell pwend...@gmail.com Authored: Mon Aug 10 14:26:56 2015 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Aug 10 14:26:56 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 3ef7d6f..e9c6d26 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 684e07b..ed5c37e 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index bb25652..0e53a79 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9ef1eda..e6884b0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 6377c3e..1318959 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0/version +version1.5.0-SNAPSHOT/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 7d72f78..0664cfb 100644 ---
[1/2] spark git commit: Preparing Spark release v1.5.0-snapshot-20150810
Repository: spark Updated Branches: refs/heads/branch-1.5 e51779c17 - 0e4f58ed9 Preparing Spark release v1.5.0-snapshot-20150810 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3369ad9b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3369ad9b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3369ad9b Branch: refs/heads/branch-1.5 Commit: 3369ad9bcba73737b5b055a4e76b18f2a4da2c5d Parents: e51779c Author: Patrick Wendell pwend...@gmail.com Authored: Mon Aug 10 14:26:49 2015 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Mon Aug 10 14:26:49 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 32 files changed, 32 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index e9c6d26..3ef7d6f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index ed5c37e..684e07b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 0e53a79..bb25652 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index e6884b0..9ef1eda 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 1318959..6377c3e 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ parent groupIdorg.apache.spark/groupId artifactIdspark-parent_2.10/artifactId -version1.5.0-SNAPSHOT/version +version1.5.0/version relativePath../../pom.xml/relativePath /parent http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-snapshot-20150810 [created] 3369ad9bc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9737] [YARN] Add the suggested configuration when required executor memory is above the max threshold of this cluster on YARN mode
Repository: spark Updated Branches: refs/heads/branch-1.5 0e4f58ed9 - 51406becc [SPARK-9737] [YARN] Add the suggested configuration when required executor memory is above the max threshold of this cluster on YARN mode Author: Yadong Qi qiyadong2...@gmail.com Closes #8028 from watermen/SPARK-9737 and squashes the following commits: 48bdf3d [Yadong Qi] Add suggested configuration. (cherry picked from commit 86fa4ba6d13f909cb508b7cb3b153d586fe59bc3) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51406bec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51406bec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51406bec Branch: refs/heads/branch-1.5 Commit: 51406becc78857dbc36c3330f3afe8dd5926dc2f Parents: 0e4f58e Author: Yadong Qi qiyadong2...@gmail.com Authored: Sun Aug 9 19:54:05 2015 +0100 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 15:40:01 2015 -0700 -- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51406bec/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index fc11bbf..b4ba3f0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -203,12 +203,14 @@ private[spark] class Client( val executorMem = args.executorMemory + executorMemoryOverhead if (executorMem maxMem) { throw new IllegalArgumentException(sRequired executor memory (${args.executorMemory} + -s+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!) +s+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! + +Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.) } val amMem = args.amMemory + amMemoryOverhead if (amMem maxMem) { throw new IllegalArgumentException(sRequired AM memory (${args.amMemory} + -s+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!) +s+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! + +Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.) } logInfo(Will allocate AM container, with %d MB memory including %d MB overhead.format( amMem, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python
Repository: spark Updated Branches: refs/heads/master c4fd2a242 - 853809e94 [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python This PR is based on #4229, thanks prabeesh. Closes #4229 Author: Prabeesh K prabsma...@gmail.com Author: zsxwing zsxw...@gmail.com Author: prabs prabsma...@gmail.com Author: Prabeesh K prabees...@namshi.com Closes #7833 from zsxwing/pr4229 and squashes the following commits: 9570bec [zsxwing] Fix the variable name and check null in finally 4a9c79e [zsxwing] Fix pom.xml indentation abf5f18 [zsxwing] Merge branch 'master' into pr4229 935615c [zsxwing] Fix the flaky MQTT tests 47278c5 [zsxwing] Include the project class files 478f844 [zsxwing] Add unpack 5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests 734db99 [zsxwing] Merge branch 'master' into pr4229 126608a [Prabeesh K] address the comments b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229 d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test a6747cb [Prabeesh K] wait for starting the receiver before publishing data 87fc677 [Prabeesh K] address the comments: 97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt 80474d1 [Prabeesh K] fix 1f0cfe9 [Prabeesh K] python style fix e1ee016 [Prabeesh K] scala style fix a5a8f9f [Prabeesh K] added Python test 9767d82 [Prabeesh K] implemented Python-friendly class a11968b [Prabeesh K] fixed python style 795ec27 [Prabeesh K] address comments ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly 3f4df12 [Prabeesh K] updated version b34c3c1 [prabs] adress comments 3aa7fff [prabs] Added Python streaming mqtt word count example b7d42ff [prabs] Mqtt streaming support in Python Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/853809e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/853809e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/853809e9 Branch: refs/heads/master Commit: 853809e948e7c5092643587a30738115b6591a59 Parents: c4fd2a2 Author: Prabeesh K prabsma...@gmail.com Authored: Mon Aug 10 16:33:23 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 16:33:23 2015 -0700 -- dev/run-tests.py| 2 + dev/sparktestsupport/modules.py | 2 + docs/streaming-programming-guide.md | 2 +- .../src/main/python/streaming/mqtt_wordcount.py | 58 + external/mqtt-assembly/pom.xml | 102 external/mqtt/pom.xml | 28 + external/mqtt/src/main/assembly/assembly.xml| 44 +++ .../apache/spark/streaming/mqtt/MQTTUtils.scala | 16 +++ .../spark/streaming/mqtt/MQTTStreamSuite.scala | 118 +++ .../spark/streaming/mqtt/MQTTTestUtils.scala| 111 + pom.xml | 1 + project/SparkBuild.scala| 12 +- python/pyspark/streaming/mqtt.py| 72 +++ python/pyspark/streaming/tests.py | 106 - 14 files changed, 565 insertions(+), 109 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/run-tests.py -- diff --git a/dev/run-tests.py b/dev/run-tests.py index d1852b9..f689425 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version): assembly/assembly, streaming-kafka-assembly/assembly, streaming-flume-assembly/assembly, + streaming-mqtt-assembly/assembly, + streaming-mqtt/test:assembly, streaming-kinesis-asl-assembly/assembly] profiles_and_goals = build_profiles + sbt_goals http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index a9717ff..d82c0cc 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -181,6 +181,7 @@ streaming_mqtt = Module( dependencies=[streaming], source_file_regexes=[ external/mqtt, +external/mqtt-assembly, ], sbt_test_goals=[ streaming-mqtt/test, @@ -306,6 +307,7 @@ pyspark_streaming = Module( streaming, streaming_kafka, streaming_flume_assembly, +streaming_mqtt, streaming_kinesis_asl ], source_file_regexes=[ http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/docs/streaming-programming-guide.md
spark git commit: [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python
Repository: spark Updated Branches: refs/heads/branch-1.5 51406becc - 8f4014fda [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python This PR is based on #4229, thanks prabeesh. Closes #4229 Author: Prabeesh K prabsma...@gmail.com Author: zsxwing zsxw...@gmail.com Author: prabs prabsma...@gmail.com Author: Prabeesh K prabees...@namshi.com Closes #7833 from zsxwing/pr4229 and squashes the following commits: 9570bec [zsxwing] Fix the variable name and check null in finally 4a9c79e [zsxwing] Fix pom.xml indentation abf5f18 [zsxwing] Merge branch 'master' into pr4229 935615c [zsxwing] Fix the flaky MQTT tests 47278c5 [zsxwing] Include the project class files 478f844 [zsxwing] Add unpack 5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT tests 734db99 [zsxwing] Merge branch 'master' into pr4229 126608a [Prabeesh K] address the comments b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229 d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; Revert unncessary changes; fix the python unit test a6747cb [Prabeesh K] wait for starting the receiver before publishing data 87fc677 [Prabeesh K] address the comments: 97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt 80474d1 [Prabeesh K] fix 1f0cfe9 [Prabeesh K] python style fix e1ee016 [Prabeesh K] scala style fix a5a8f9f [Prabeesh K] added Python test 9767d82 [Prabeesh K] implemented Python-friendly class a11968b [Prabeesh K] fixed python style 795ec27 [Prabeesh K] address comments ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly 3f4df12 [Prabeesh K] updated version b34c3c1 [prabs] adress comments 3aa7fff [prabs] Added Python streaming mqtt word count example b7d42ff [prabs] Mqtt streaming support in Python (cherry picked from commit 853809e948e7c5092643587a30738115b6591a59) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f4014fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f4014fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f4014fd Branch: refs/heads/branch-1.5 Commit: 8f4014fdaf22dd8a3bd4728987c76c11d79e07d9 Parents: 51406be Author: Prabeesh K prabsma...@gmail.com Authored: Mon Aug 10 16:33:23 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 16:33:34 2015 -0700 -- dev/run-tests.py| 2 + dev/sparktestsupport/modules.py | 2 + docs/streaming-programming-guide.md | 2 +- .../src/main/python/streaming/mqtt_wordcount.py | 58 + external/mqtt-assembly/pom.xml | 102 external/mqtt/pom.xml | 28 + external/mqtt/src/main/assembly/assembly.xml| 44 +++ .../apache/spark/streaming/mqtt/MQTTUtils.scala | 16 +++ .../spark/streaming/mqtt/MQTTStreamSuite.scala | 118 +++ .../spark/streaming/mqtt/MQTTTestUtils.scala| 111 + pom.xml | 1 + project/SparkBuild.scala| 12 +- python/pyspark/streaming/mqtt.py| 72 +++ python/pyspark/streaming/tests.py | 106 - 14 files changed, 565 insertions(+), 109 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f4014fd/dev/run-tests.py -- diff --git a/dev/run-tests.py b/dev/run-tests.py index d1852b9..f689425 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version): assembly/assembly, streaming-kafka-assembly/assembly, streaming-flume-assembly/assembly, + streaming-mqtt-assembly/assembly, + streaming-mqtt/test:assembly, streaming-kinesis-asl-assembly/assembly] profiles_and_goals = build_profiles + sbt_goals http://git-wip-us.apache.org/repos/asf/spark/blob/8f4014fd/dev/sparktestsupport/modules.py -- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index a9717ff..d82c0cc 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -181,6 +181,7 @@ streaming_mqtt = Module( dependencies=[streaming], source_file_regexes=[ external/mqtt, +external/mqtt-assembly, ], sbt_test_goals=[ streaming-mqtt/test, @@ -306,6 +307,7 @@ pyspark_streaming = Module( streaming, streaming_kafka, streaming_flume_assembly, +streaming_mqtt, streaming_kinesis_asl
spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Repository: spark Updated Branches: refs/heads/branch-1.4 4b5bbc589 - 6dde38026 [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files. Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu viadea...@gmail.com Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801 (cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dde3802 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dde3802 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dde3802 Branch: refs/heads/branch-1.4 Commit: 6dde38026113d8f83190e801a0f889c53bbc316d Parents: 4b5bbc5 Author: Hao Zhu viadea...@gmail.com Authored: Mon Aug 10 17:17:22 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 17:17:47 2015 -0700 -- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6dde3802/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331..bd117ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -189,7 +189,9 @@ class CheckpointWriter( + ') // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { +fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -200,7 +202,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { -fs.delete(backupFile, true) // just in case it exists +if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists +} if (!fs.rename(checkpointFile, backupFile)) { logWarning(Could not rename + checkpointFile + to + backupFile) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Repository: spark Updated Branches: refs/heads/branch-1.5 8f4014fda - 94692bb14 [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files. Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu viadea...@gmail.com Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801 (cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94692bb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94692bb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94692bb1 Branch: refs/heads/branch-1.5 Commit: 94692bb14f75b814aab00bc43f15550e26ada6f1 Parents: 8f4014f Author: Hao Zhu viadea...@gmail.com Authored: Mon Aug 10 17:17:22 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 17:17:33 2015 -0700 -- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94692bb1/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2780d5b..6f6b449 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -192,7 +192,9 @@ class CheckpointWriter( + ') // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { +fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -203,7 +205,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { -fs.delete(backupFile, true) // just in case it exists +if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists +} if (!fs.rename(checkpointFile, backupFile)) { logWarning(Could not rename + checkpointFile + to + backupFile) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Repository: spark Updated Branches: refs/heads/master 853809e94 - 3c9802d94 [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files. Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu viadea...@gmail.com Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c9802d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c9802d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c9802d9 Branch: refs/heads/master Commit: 3c9802d9400bea802984456683b2736a450ee17e Parents: 853809e Author: Hao Zhu viadea...@gmail.com Authored: Mon Aug 10 17:17:22 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 17:17:22 2015 -0700 -- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c9802d9/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2780d5b..6f6b449 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -192,7 +192,9 @@ class CheckpointWriter( + ') // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { +fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -203,7 +205,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { -fs.delete(backupFile, true) // just in case it exists +if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists +} if (!fs.rename(checkpointFile, backupFile)) { logWarning(Could not rename + checkpointFile + to + backupFile) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.
Repository: spark Updated Branches: refs/heads/branch-1.3 b104501d3 - a98603f8c [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files. Spark streaming deletes the temp file and backup files without checking if they exist or not Author: Hao Zhu viadea...@gmail.com Closes #8082 from viadea/master and squashes the following commits: 242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of those files fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. 087daf0 [Hao Zhu] SPARK-9801 (cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e) Signed-off-by: Tathagata Das tathagata.das1...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a98603f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a98603f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a98603f8 Branch: refs/heads/branch-1.3 Commit: a98603f8c118fcd23efe80ebaa120e47e9785d46 Parents: b104501 Author: Hao Zhu viadea...@gmail.com Authored: Mon Aug 10 17:17:22 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Mon Aug 10 17:18:03 2015 -0700 -- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a98603f8/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 832ce78..c1d0fe4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -137,7 +137,9 @@ class CheckpointWriter( + ') // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { +fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) fos.write(bytes) fos.close() @@ -145,7 +147,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { -fs.delete(backupFile, true) // just in case it exists +if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists +} if (!fs.rename(checkpointFile, backupFile)) { logWarning(Could not rename + checkpointFile + to + backupFile) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[04/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala new file mode 100644 index 000..8f06de7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -0,0 +1,916 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.parquet.schema.MessageTypeParser + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.types._ + +abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { + val sqlContext = TestSQLContext + + /** + * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. + */ + protected def testSchemaInference[T : Product: ClassTag: TypeTag]( + testName: String, + messageType: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +testSchema( + testName, + StructType.fromAttributes(ScalaReflection.attributesFor[T]), + messageType, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + } + + protected def testParquetToCatalyst( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +val converter = new CatalystSchemaConverter( + assumeBinaryIsString = binaryAsString, + assumeInt96IsTimestamp = int96AsTimestamp, + followParquetFormatSpec = followParquetFormatSpec) + +test(ssql = parquet: $testName) { + val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) + val expected = sqlSchema + assert( +actual === expected, +sSchema mismatch. + |Expected schema: ${expected.json} + |Actual schema: ${actual.json} + .stripMargin) +} + } + + protected def testCatalystToParquet( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +val converter = new CatalystSchemaConverter( + assumeBinaryIsString = binaryAsString, + assumeInt96IsTimestamp = int96AsTimestamp, + followParquetFormatSpec = followParquetFormatSpec) + +test(ssql = parquet: $testName) { + val actual = converter.convert(sqlSchema) + val expected = MessageTypeParser.parseMessageType(parquetSchema) + actual.checkContains(expected) + expected.checkContains(actual) +} + } + + protected def testSchema( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { + +testCatalystToParquet( + testName, + sqlSchema, + parquetSchema, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + +testParquetToCatalyst( + testName, + sqlSchema, + parquetSchema, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + } +} + +class ParquetSchemaInferenceSuite extends ParquetSchemaTest { +
[04/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala new file mode 100644 index 000..8f06de7 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -0,0 +1,916 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.parquet.schema.MessageTypeParser + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.types._ + +abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { + val sqlContext = TestSQLContext + + /** + * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. + */ + protected def testSchemaInference[T : Product: ClassTag: TypeTag]( + testName: String, + messageType: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +testSchema( + testName, + StructType.fromAttributes(ScalaReflection.attributesFor[T]), + messageType, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + } + + protected def testParquetToCatalyst( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +val converter = new CatalystSchemaConverter( + assumeBinaryIsString = binaryAsString, + assumeInt96IsTimestamp = int96AsTimestamp, + followParquetFormatSpec = followParquetFormatSpec) + +test(ssql = parquet: $testName) { + val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) + val expected = sqlSchema + assert( +actual === expected, +sSchema mismatch. + |Expected schema: ${expected.json} + |Actual schema: ${actual.json} + .stripMargin) +} + } + + protected def testCatalystToParquet( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { +val converter = new CatalystSchemaConverter( + assumeBinaryIsString = binaryAsString, + assumeInt96IsTimestamp = int96AsTimestamp, + followParquetFormatSpec = followParquetFormatSpec) + +test(ssql = parquet: $testName) { + val actual = converter.convert(sqlSchema) + val expected = MessageTypeParser.parseMessageType(parquetSchema) + actual.checkContains(expected) + expected.checkContains(actual) +} + } + + protected def testSchema( + testName: String, + sqlSchema: StructType, + parquetSchema: String, + binaryAsString: Boolean = true, + int96AsTimestamp: Boolean = true, + followParquetFormatSpec: Boolean = false, + isThriftDerived: Boolean = false): Unit = { + +testCatalystToParquet( + testName, + sqlSchema, + parquetSchema, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + +testParquetToCatalyst( + testName, + sqlSchema, + parquetSchema, + binaryAsString, + int96AsTimestamp, + followParquetFormatSpec, + isThriftDerived) + } +} + +class ParquetSchemaInferenceSuite extends ParquetSchemaTest { +
[10/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala new file mode 100644 index 000..0b0867f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI) + extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { + + val parent = sparkUI + val listener = sqlContext.listener + + attachPage(new AllExecutionsPage(this)) + attachPage(new ExecutionPage(this)) + parent.attachTab(this) + + parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, /static/sql) +} + +private[sql] object SQLTab { + + private val STATIC_RESOURCE_DIR = org/apache/spark/sql/execution/ui/static + + private val nextTabId = new AtomicInteger(0) + + private def nextTabName: String = { +val nextId = nextTabId.getAndIncrement() +if (nextId == 0) SQL else sSQL$nextId + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala new file mode 100644 index 000..ae3d752 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} + +/** + * A graph used for storing information of an executionPlan of DataFrame. + * + * Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the + * SparkPlan tree, and each edge represents a parent-child relationship between two nodes. + */ +private[ui] case class SparkPlanGraph( +nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { + + def makeDotFile(metrics: Map[Long, Any]): String = { +val dotFile = new StringBuilder +dotFile.append(digraph G {\n) +nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n)) +edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n)) +dotFile.append(}) +dotFile.toString() + } +} + +private[sql] object SparkPlanGraph { + + /** + * Build a SparkPlanGraph from the root of a SparkPlan tree. + */ + def apply(plan: SparkPlan): SparkPlanGraph = { +val nodeIdGenerator = new AtomicLong(0) +val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() +val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() +buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges) +new
[08/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala deleted file mode 100644 index b6db71b..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.net.URI -import java.util.logging.{Level, Logger = JLogger} -import java.util.{List = JList} - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.util.{Failure, Try} - -import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} -import org.apache.parquet.schema.MessageType -import org.apache.parquet.{Log = ParquetLog} - -import org.apache.spark.{Logging, Partition = SparkPartition, SparkException} -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} -import org.apache.spark.rdd.RDD._ -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionSpec -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} - - -private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { - - def format(): String = parquet - - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - schema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { -new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext) - } -} - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) - extends OutputWriter { - - private val recordWriter: RecordWriter[Void, InternalRow] = { -val outputFormat = { - new ParquetOutputFormat[InternalRow]() { -// Here we override `getDefaultWorkFile` for two reasons: -// -// 1. To allow appending. We need to generate unique output file names to avoid -// overwriting existing files (either exist before the write job, or are just written -// by other tasks within the same write job). -// -// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses -// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all -// partitions in the case of dynamic partitioning. -override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val uniqueWriteJobId = context.getConfiguration.get(spark.sql.sources.writeJobUUID) - val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension) -} - } -} - -outputFormat.getRecordWriter(context) - } - - override def write(row: Row): Unit = throw new UnsupportedOperationException(call writeInternal) - - override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(context) -} - -private[sql] class ParquetRelation( -override val paths: Array[String], -private val maybeDataSchema: Option[StructType], -// This is for metastore conversion. -private
[03/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala deleted file mode 100644 index 92022ff..000 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ /dev/null @@ -1,1172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.json - -import java.io.{File, StringWriter} -import java.sql.{Date, Timestamp} - -import com.fasterxml.jackson.core.JsonFactory -import org.apache.spark.rdd.RDD -import org.scalactic.Tolerance._ - -import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf} -import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} -import org.apache.spark.sql.json.InferSchema.compatibleType -import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils - -class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData { - - protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext - override def sqlContext: SQLContext = ctx // used by SQLTestUtils - - import ctx.sql - import ctx.implicits._ - - test(Type promotion) { -def checkTypePromotion(expected: Any, actual: Any) { - assert(expected.getClass == actual.getClass, -sFailed to promote ${actual.getClass} to ${expected.getClass}.) - assert(expected == actual, -sPromoted value ${actual}(${actual.getClass}) does not equal the expected value + - s${expected}(${expected.getClass}).) -} - -val factory = new JsonFactory() -def enforceCorrectType(value: Any, dataType: DataType): Any = { - val writer = new StringWriter() - val generator = factory.createGenerator(writer) - generator.writeObject(value) - generator.flush() - - val parser = factory.createParser(writer.toString) - parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) -} - -val intNumber: Int = 2147483647 -checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType)) -checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType)) -checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType)) -checkTypePromotion( - Decimal(intNumber), enforceCorrectType(intNumber, DecimalType.SYSTEM_DEFAULT)) - -val longNumber: Long = 9223372036854775807L -checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType)) -checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType)) -checkTypePromotion( - Decimal(longNumber), enforceCorrectType(longNumber, DecimalType.SYSTEM_DEFAULT)) - -val doubleNumber: Double = 1.7976931348623157E308d -checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) - -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)), -enforceCorrectType(intNumber, TimestampType)) -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), -enforceCorrectType(intNumber.toLong, TimestampType)) -val strTime = 2014-09-30 12:34:56 - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), -enforceCorrectType(strTime, TimestampType)) - -val strDate = 2014-10-15 -checkTypePromotion( - DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) - -val ISO8601Time1 = 1970-01-01T01:00:01.0Z -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), -enforceCorrectType(ISO8601Time1, TimestampType)) -checkTypePromotion(DateTimeUtils.millisToDays(3601000), - enforceCorrectType(ISO8601Time1, DateType)) -val ISO8601Time2 = 1970-01-01T02:00:01-01:00 -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), -
[02/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala deleted file mode 100644 index b415da5..000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.parquet.example.data.simple.SimpleGroup -import org.apache.parquet.example.data.{Group, GroupWriter} -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.{MessageType, MessageTypeParser} - -import org.apache.spark.SparkException -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ - -// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport -// with an empty configuration (it is after all not intended to be used in this way?) -// and members are private so we need to make our own in order to pass the schema -// to the writer. -private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { - var groupWriter: GroupWriter = null - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { -groupWriter = new GroupWriter(recordConsumer, schema) - } - - override def init(configuration: Configuration): WriteContext = { -new WriteContext(schema, new java.util.HashMap[String, String]()) - } - - override def write(record: Group) { -groupWriter.write(record) - } -} - -/** - * A test suite that tests basic Parquet I/O. - */ -class ParquetIOSuite extends QueryTest with ParquetTest { - lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext - import sqlContext.implicits._ - - /** - * Writes `data` to a Parquet file, reads it back and check file contents. - */ - protected def checkParquetFile[T : Product : ClassTag: TypeTag](data: Seq[T]): Unit = { -withParquetDataFrame(data)(r = checkAnswer(r, data.map(Row.fromTuple))) - } - - test(basic data types (without binary)) { -val data = (1 to 4).map { i = - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) -} -checkParquetFile(data) - } - - test(raw binary) { -val data = (1 to 4).map(i = Tuple1(Array.fill(3)(i.toByte))) -withParquetDataFrame(data) { df = - assertResult(data.map(_._1.mkString(,)).sorted) { -df.collect().map(_.getAs[Array[Byte]](0).mkString(,)).sorted - } -} - } - - test(string) { -val data = (1 to 4).map(i = Tuple1(i.toString)) -// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL -// as we store Spark SQL schema in the extra metadata. -withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - false)(checkParquetFile(data)) -withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - true)(checkParquetFile(data)) - } - - test(fixed-length decimals) { -def makeDecimalRDD(decimal: DecimalType): DataFrame = - sqlContext.sparkContext -.parallelize(0 to 1000) -.map(i = Tuple1(i / 100.0)) -.toDF() -// Parquet doesn't allow column names with spaces, have to add an alias here -.select($_1 cast decimal as dec) - -for ((precision, scale) - Seq((5, 2), (1, 0), (1, 1), (18, 10), (18,
[11/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala new file mode 100644 index 000..4086a13 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.net.URI +import java.util.logging.{Level, Logger = JLogger} +import java.util.{List = JList} + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.util.{Failure, Try} + +import com.google.common.base.Objects +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} +import org.apache.parquet.schema.MessageType +import org.apache.parquet.{Log = ParquetLog} + +import org.apache.spark.{Logging, Partition = SparkPartition, SparkException} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} +import org.apache.spark.rdd.RDD._ +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.{SerializableConfiguration, Utils} + + +private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { + + override def shortName(): String = parquet + + override def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): HadoopFsRelation = { +new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext) + } +} + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, InternalRow] = { +val outputFormat = { + new ParquetOutputFormat[InternalRow]() { +// Here we override `getDefaultWorkFile` for two reasons: +// +// 1. To allow appending. We need to generate unique output file names to avoid +// overwriting existing files (either exist before the write job, or are just written +// by other tasks within the same write job). +// +// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses +// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all +// partitions in the case of dynamic partitioning. +override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get(spark.sql.sources.writeJobUUID) + val split = context.getTaskAttemptID.getTaskID.getId + new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension) +} + } +} + +outputFormat.getRecordWriter(context) + } + + override def write(row: Row): Unit = throw new UnsupportedOperationException(call writeInternal) + + override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) + + override def close(): Unit = recordWriter.close(context) +} + +private[sql] class ParquetRelation( +override val paths:
[11/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala new file mode 100644 index 000..4086a13 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.net.URI +import java.util.logging.{Level, Logger = JLogger} +import java.util.{List = JList} + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.util.{Failure, Try} + +import com.google.common.base.Objects +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} +import org.apache.parquet.schema.MessageType +import org.apache.parquet.{Log = ParquetLog} + +import org.apache.spark.{Logging, Partition = SparkPartition, SparkException} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} +import org.apache.spark.rdd.RDD._ +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.{SerializableConfiguration, Utils} + + +private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { + + override def shortName(): String = parquet + + override def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): HadoopFsRelation = { +new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext) + } +} + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, InternalRow] = { +val outputFormat = { + new ParquetOutputFormat[InternalRow]() { +// Here we override `getDefaultWorkFile` for two reasons: +// +// 1. To allow appending. We need to generate unique output file names to avoid +// overwriting existing files (either exist before the write job, or are just written +// by other tasks within the same write job). +// +// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses +// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all +// partitions in the case of dynamic partitioning. +override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val uniqueWriteJobId = context.getConfiguration.get(spark.sql.sources.writeJobUUID) + val split = context.getTaskAttemptID.getTaskID.getId + new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension) +} + } +} + +outputFormat.getRecordWriter(context) + } + + override def write(row: Row): Unit = throw new UnsupportedOperationException(call writeInternal) + + override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) + + override def close(): Unit = recordWriter.close(context) +} + +private[sql] class ParquetRelation( +override val paths:
[13/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala new file mode 100644 index 000..8eab6a0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc + +import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} +import java.util.Properties + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.jdbc.JdbcDialects +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} + +/** + * Data corresponding to one partition of a JDBCRDD. + */ +private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition { + override def index: Int = idx +} + + +private[sql] object JDBCRDD extends Logging { + + /** + * Maps a JDBC type to a Catalyst type. This function is called only when + * the JdbcDialect class corresponding to your database driver returns null. + * + * @param sqlType - A field of java.sql.Types + * @return The Catalyst type corresponding to sqlType. + */ + private def getCatalystType( + sqlType: Int, + precision: Int, + scale: Int, + signed: Boolean): DataType = { +val answer = sqlType match { + // scalastyle:off + case java.sql.Types.ARRAY = null + case java.sql.Types.BIGINT= if (signed) { LongType } else { DecimalType(20,0) } + case java.sql.Types.BINARY= BinaryType + case java.sql.Types.BIT = BooleanType // @see JdbcDialect for quirks + case java.sql.Types.BLOB = BinaryType + case java.sql.Types.BOOLEAN = BooleanType + case java.sql.Types.CHAR = StringType + case java.sql.Types.CLOB = StringType + case java.sql.Types.DATALINK = null + case java.sql.Types.DATE = DateType + case java.sql.Types.DECIMAL +if precision != 0 || scale != 0 = DecimalType.bounded(precision, scale) + case java.sql.Types.DECIMAL = DecimalType.SYSTEM_DEFAULT + case java.sql.Types.DISTINCT = null + case java.sql.Types.DOUBLE= DoubleType + case java.sql.Types.FLOAT = FloatType + case java.sql.Types.INTEGER = if (signed) { IntegerType } else { LongType } + case java.sql.Types.JAVA_OBJECT = null + case java.sql.Types.LONGNVARCHAR = StringType + case java.sql.Types.LONGVARBINARY = BinaryType + case java.sql.Types.LONGVARCHAR = StringType + case java.sql.Types.NCHAR = StringType + case java.sql.Types.NCLOB = StringType + case java.sql.Types.NULL = null + case java.sql.Types.NUMERIC +if precision != 0 || scale != 0 = DecimalType.bounded(precision, scale) + case java.sql.Types.NUMERIC = DecimalType.SYSTEM_DEFAULT + case java.sql.Types.NVARCHAR = StringType + case java.sql.Types.OTHER = null + case java.sql.Types.REAL = DoubleType + case java.sql.Types.REF = StringType + case java.sql.Types.ROWID = LongType + case java.sql.Types.SMALLINT = IntegerType + case java.sql.Types.SQLXML= StringType + case java.sql.Types.STRUCT= StringType + case java.sql.Types.TIME = TimestampType + case java.sql.Types.TIMESTAMP = TimestampType + case java.sql.Types.TINYINT = IntegerType + case
[08/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala deleted file mode 100644 index b6db71b..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.net.URI -import java.util.logging.{Level, Logger = JLogger} -import java.util.{List = JList} - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.util.{Failure, Try} - -import com.google.common.base.Objects -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} -import org.apache.parquet.schema.MessageType -import org.apache.parquet.{Log = ParquetLog} - -import org.apache.spark.{Logging, Partition = SparkPartition, SparkException} -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} -import org.apache.spark.rdd.RDD._ -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionSpec -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} - - -private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { - - def format(): String = parquet - - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - schema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { -new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext) - } -} - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) - extends OutputWriter { - - private val recordWriter: RecordWriter[Void, InternalRow] = { -val outputFormat = { - new ParquetOutputFormat[InternalRow]() { -// Here we override `getDefaultWorkFile` for two reasons: -// -// 1. To allow appending. We need to generate unique output file names to avoid -// overwriting existing files (either exist before the write job, or are just written -// by other tasks within the same write job). -// -// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses -// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all -// partitions in the case of dynamic partitioning. -override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val uniqueWriteJobId = context.getConfiguration.get(spark.sql.sources.writeJobUUID) - val split = context.getTaskAttemptID.getTaskID.getId - new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension) -} - } -} - -outputFormat.getRecordWriter(context) - } - - override def write(row: Row): Unit = throw new UnsupportedOperationException(call writeInternal) - - override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(context) -} - -private[sql] class ParquetRelation( -override val paths: Array[String], -private val maybeDataSchema: Option[StructType], -// This is for metastore conversion. -private
[10/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala new file mode 100644 index 000..0b0867f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI) + extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging { + + val parent = sparkUI + val listener = sqlContext.listener + + attachPage(new AllExecutionsPage(this)) + attachPage(new ExecutionPage(this)) + parent.attachTab(this) + + parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, /static/sql) +} + +private[sql] object SQLTab { + + private val STATIC_RESOURCE_DIR = org/apache/spark/sql/execution/ui/static + + private val nextTabId = new AtomicInteger(0) + + private def nextTabName: String = { +val nextId = nextTabId.getAndIncrement() +if (nextId == 0) SQL else sSQL$nextId + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala new file mode 100644 index 000..ae3d752 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable + +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue} + +/** + * A graph used for storing information of an executionPlan of DataFrame. + * + * Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the + * SparkPlan tree, and each edge represents a parent-child relationship between two nodes. + */ +private[ui] case class SparkPlanGraph( +nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { + + def makeDotFile(metrics: Map[Long, Any]): String = { +val dotFile = new StringBuilder +dotFile.append(digraph G {\n) +nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n)) +edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n)) +dotFile.append(}) +dotFile.toString() + } +} + +private[sql] object SparkPlanGraph { + + /** + * Build a SparkPlanGraph from the root of a SparkPlan tree. + */ + def apply(plan: SparkPlan): SparkPlanGraph = { +val nodeIdGenerator = new AtomicLong(0) +val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() +val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() +buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges) +new
[14/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
[SPARK-9763][SQL] Minimize exposure of internal SQL classes. There are a few changes in this pull request: 1. Moved all data sources to execution.datasources, except the public JDBC APIs. 2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution. 3. Moved ui and metric package into execution. 4. Added more documentation on some internal classes. 5. Renamed DataSourceRegister.format - shortName. 6. Added override modifier on shortName. 7. Removed IntSQLMetric. Author: Reynold Xin r...@databricks.com Closes #8056 from rxin/SPARK-9763 and squashes the following commits: 9df4801 [Reynold Xin] Removed hardcoded name in test cases. d9babc6 [Reynold Xin] Shorten. e484419 [Reynold Xin] Removed VisibleForTesting. 171b812 [Reynold Xin] MimaExcludes. 2041389 [Reynold Xin] Compile ... 79dda42 [Reynold Xin] Compile. 0818ba3 [Reynold Xin] Removed IntSQLMetric. c46884f [Reynold Xin] Two more fixes. f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes. (cherry picked from commit 40ed2af587cedadc6e5249031857a922b3b234ca) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1838e43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1838e43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1838e43 Branch: refs/heads/branch-1.5 Commit: c1838e4309a2286cebc8bc73907b1c403260f22c Parents: d251d9f Author: Reynold Xin r...@databricks.com Authored: Mon Aug 10 13:49:23 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:49:34 2015 -0700 -- project/MimaExcludes.scala | 24 +- apache.spark.sql.sources.DataSourceRegister |6 +- .../sql/execution/ui/static/spark-sql-viz.css | 37 + .../sql/execution/ui/static/spark-sql-viz.js| 160 +++ .../spark/sql/ui/static/spark-sql-viz.css | 37 - .../apache/spark/sql/ui/static/spark-sql-viz.js | 160 --- .../scala/org/apache/spark/sql/DataFrame.scala |2 +- .../org/apache/spark/sql/DataFrameReader.scala |6 +- .../org/apache/spark/sql/DataFrameWriter.scala |6 +- .../scala/org/apache/spark/sql/SQLContext.scala |2 +- .../spark/sql/execution/SQLExecution.scala |2 +- .../apache/spark/sql/execution/SparkPlan.scala |8 +- .../spark/sql/execution/basicOperators.scala|2 +- .../sql/execution/datasources/DDLParser.scala | 185 +++ .../execution/datasources/DefaultSource.scala | 64 + .../datasources/InsertIntoDataSource.scala | 23 +- .../datasources/ResolvedDataSource.scala| 204 +++ .../spark/sql/execution/datasources/ddl.scala | 352 +- .../datasources/jdbc/DefaultSource.scala| 62 + .../datasources/jdbc/DriverRegistry.scala | 60 + .../datasources/jdbc/DriverWrapper.scala| 48 + .../execution/datasources/jdbc/JDBCRDD.scala| 489 .../datasources/jdbc/JDBCRelation.scala | 113 ++ .../execution/datasources/jdbc/JdbcUtils.scala | 219 .../datasources/json/InferSchema.scala | 207 .../datasources/json/JSONRelation.scala | 204 +++ .../datasources/json/JacksonGenerator.scala | 135 ++ .../datasources/json/JacksonParser.scala| 228 .../datasources/json/JacksonUtils.scala | 32 + .../parquet/CatalystReadSupport.scala | 153 +++ .../parquet/CatalystRecordMaterializer.scala| 41 + .../parquet/CatalystRowConverter.scala | 449 +++ .../parquet/CatalystSchemaConverter.scala | 592 + .../parquet/DirectParquetOutputCommitter.scala | 87 ++ .../datasources/parquet/ParquetConverter.scala | 39 + .../datasources/parquet/ParquetFilters.scala| 360 ++ .../datasources/parquet/ParquetRelation.scala | 796 .../parquet/ParquetTableSupport.scala | 322 + .../parquet/ParquetTypesConverter.scala | 159 +++ .../spark/sql/execution/metric/SQLMetrics.scala | 115 ++ .../apache/spark/sql/execution/package.scala|8 +- .../sql/execution/ui/AllExecutionsPage.scala| 238 .../spark/sql/execution/ui/ExecutionPage.scala | 127 ++ .../spark/sql/execution/ui/SQLListener.scala| 352 ++ .../apache/spark/sql/execution/ui/SQLTab.scala | 49 + .../spark/sql/execution/ui/SparkPlanGraph.scala | 118 ++ .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 490 .../apache/spark/sql/jdbc/JDBCRelation.scala| 152 --- .../org/apache/spark/sql/jdbc/JdbcUtils.scala | 52 - .../scala/org/apache/spark/sql/jdbc/jdbc.scala | 250 .../org/apache/spark/sql/json/InferSchema.scala | 207 .../apache/spark/sql/json/JSONRelation.scala| 203 --- .../spark/sql/json/JacksonGenerator.scala | 135 --
[02/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala deleted file mode 100644 index b415da5..000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.parquet.example.data.simple.SimpleGroup -import org.apache.parquet.example.data.{Group, GroupWriter} -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.{MessageType, MessageTypeParser} - -import org.apache.spark.SparkException -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ - -// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport -// with an empty configuration (it is after all not intended to be used in this way?) -// and members are private so we need to make our own in order to pass the schema -// to the writer. -private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { - var groupWriter: GroupWriter = null - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { -groupWriter = new GroupWriter(recordConsumer, schema) - } - - override def init(configuration: Configuration): WriteContext = { -new WriteContext(schema, new java.util.HashMap[String, String]()) - } - - override def write(record: Group) { -groupWriter.write(record) - } -} - -/** - * A test suite that tests basic Parquet I/O. - */ -class ParquetIOSuite extends QueryTest with ParquetTest { - lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext - import sqlContext.implicits._ - - /** - * Writes `data` to a Parquet file, reads it back and check file contents. - */ - protected def checkParquetFile[T : Product : ClassTag: TypeTag](data: Seq[T]): Unit = { -withParquetDataFrame(data)(r = checkAnswer(r, data.map(Row.fromTuple))) - } - - test(basic data types (without binary)) { -val data = (1 to 4).map { i = - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) -} -checkParquetFile(data) - } - - test(raw binary) { -val data = (1 to 4).map(i = Tuple1(Array.fill(3)(i.toByte))) -withParquetDataFrame(data) { df = - assertResult(data.map(_._1.mkString(,)).sorted) { -df.collect().map(_.getAs[Array[Byte]](0).mkString(,)).sorted - } -} - } - - test(string) { -val data = (1 to 4).map(i = Tuple1(i.toString)) -// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL -// as we store Spark SQL schema in the extra metadata. -withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - false)(checkParquetFile(data)) -withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - true)(checkParquetFile(data)) - } - - test(fixed-length decimals) { -def makeDecimalRDD(decimal: DecimalType): DataFrame = - sqlContext.sparkContext -.parallelize(0 to 1000) -.map(i = Tuple1(i / 100.0)) -.toDF() -// Parquet doesn't allow column names with spaces, have to add an alias here -.select($_1 cast decimal as dec) - -for ((precision, scale) - Seq((5, 2), (1, 0), (1, 1), (18, 10), (18,
spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update
Repository: spark Updated Branches: refs/heads/branch-1.1 326988e00 - 119d6cf98 [SPARK-9633] [BUILD] SBT download locations outdated; need an update Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Follow up on https://github.com/apache/spark/pull/7792 Author: Sean Owen so...@cloudera.com Closes #7956 from srowen/SPARK-9633 and squashes the following commits: caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 known download URL. Also, use https. Conflicts: sbt/sbt-launch-lib.bash Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/119d6cf9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/119d6cf9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/119d6cf9 Branch: refs/heads/branch-1.1 Commit: 119d6cf987e59b92cca2753e921bc4e35dc37514 Parents: 326988e Author: Sean Owen so...@cloudera.com Authored: Thu Aug 6 23:43:52 2015 +0100 Committer: Davies Liu davies@gmail.com Committed: Mon Aug 10 13:14:46 2015 -0700 -- sbt/sbt-launch-lib.bash | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/119d6cf9/sbt/sbt-launch-lib.bash -- diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index c91fecf..da22ea0 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -38,8 +38,7 @@ dlog () { acquire_sbt_jar () { SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' ./project/build.properties` - URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar - URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar + URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar JAR=sbt/sbt-launch-${SBT_VERSION}.jar sbt_jar=$JAR @@ -51,9 +50,9 @@ acquire_sbt_jar () { printf Attempting to fetch sbt\n JAR_DL=${JAR}.part if hash curl 2/dev/null; then - (curl --progress-bar ${URL1} ${JAR_DL} || curl --progress-bar ${URL2} ${JAR_DL}) mv ${JAR_DL} ${JAR} + curl --progress-bar --fail --location ${URL1} ${JAR_DL} mv ${JAR_DL} ${JAR} elif hash wget 2/dev/null; then - (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) mv ${JAR_DL} ${JAR} + wget --progress=bar ${URL1} -O ${JAR_DL} mv ${JAR_DL} ${JAR} else printf You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n; exit -1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[14/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
[SPARK-9763][SQL] Minimize exposure of internal SQL classes. There are a few changes in this pull request: 1. Moved all data sources to execution.datasources, except the public JDBC APIs. 2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution. 3. Moved ui and metric package into execution. 4. Added more documentation on some internal classes. 5. Renamed DataSourceRegister.format - shortName. 6. Added override modifier on shortName. 7. Removed IntSQLMetric. Author: Reynold Xin r...@databricks.com Closes #8056 from rxin/SPARK-9763 and squashes the following commits: 9df4801 [Reynold Xin] Removed hardcoded name in test cases. d9babc6 [Reynold Xin] Shorten. e484419 [Reynold Xin] Removed VisibleForTesting. 171b812 [Reynold Xin] MimaExcludes. 2041389 [Reynold Xin] Compile ... 79dda42 [Reynold Xin] Compile. 0818ba3 [Reynold Xin] Removed IntSQLMetric. c46884f [Reynold Xin] Two more fixes. f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ed2af5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ed2af5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ed2af5 Branch: refs/heads/master Commit: 40ed2af587cedadc6e5249031857a922b3b234ca Parents: 0fe6674 Author: Reynold Xin r...@databricks.com Authored: Mon Aug 10 13:49:23 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Aug 10 13:49:23 2015 -0700 -- project/MimaExcludes.scala | 24 +- apache.spark.sql.sources.DataSourceRegister |6 +- .../sql/execution/ui/static/spark-sql-viz.css | 37 + .../sql/execution/ui/static/spark-sql-viz.js| 160 +++ .../spark/sql/ui/static/spark-sql-viz.css | 37 - .../apache/spark/sql/ui/static/spark-sql-viz.js | 160 --- .../scala/org/apache/spark/sql/DataFrame.scala |2 +- .../org/apache/spark/sql/DataFrameReader.scala |6 +- .../org/apache/spark/sql/DataFrameWriter.scala |6 +- .../scala/org/apache/spark/sql/SQLContext.scala |2 +- .../spark/sql/execution/SQLExecution.scala |2 +- .../apache/spark/sql/execution/SparkPlan.scala |8 +- .../spark/sql/execution/basicOperators.scala|2 +- .../sql/execution/datasources/DDLParser.scala | 185 +++ .../execution/datasources/DefaultSource.scala | 64 + .../datasources/InsertIntoDataSource.scala | 23 +- .../datasources/ResolvedDataSource.scala| 204 +++ .../spark/sql/execution/datasources/ddl.scala | 352 +- .../datasources/jdbc/DefaultSource.scala| 62 + .../datasources/jdbc/DriverRegistry.scala | 60 + .../datasources/jdbc/DriverWrapper.scala| 48 + .../execution/datasources/jdbc/JDBCRDD.scala| 489 .../datasources/jdbc/JDBCRelation.scala | 113 ++ .../execution/datasources/jdbc/JdbcUtils.scala | 219 .../datasources/json/InferSchema.scala | 207 .../datasources/json/JSONRelation.scala | 204 +++ .../datasources/json/JacksonGenerator.scala | 135 ++ .../datasources/json/JacksonParser.scala| 228 .../datasources/json/JacksonUtils.scala | 32 + .../parquet/CatalystReadSupport.scala | 153 +++ .../parquet/CatalystRecordMaterializer.scala| 41 + .../parquet/CatalystRowConverter.scala | 449 +++ .../parquet/CatalystSchemaConverter.scala | 592 + .../parquet/DirectParquetOutputCommitter.scala | 87 ++ .../datasources/parquet/ParquetConverter.scala | 39 + .../datasources/parquet/ParquetFilters.scala| 360 ++ .../datasources/parquet/ParquetRelation.scala | 796 .../parquet/ParquetTableSupport.scala | 322 + .../parquet/ParquetTypesConverter.scala | 159 +++ .../spark/sql/execution/metric/SQLMetrics.scala | 115 ++ .../apache/spark/sql/execution/package.scala|8 +- .../sql/execution/ui/AllExecutionsPage.scala| 238 .../spark/sql/execution/ui/ExecutionPage.scala | 127 ++ .../spark/sql/execution/ui/SQLListener.scala| 352 ++ .../apache/spark/sql/execution/ui/SQLTab.scala | 49 + .../spark/sql/execution/ui/SparkPlanGraph.scala | 118 ++ .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 490 .../apache/spark/sql/jdbc/JDBCRelation.scala| 152 --- .../org/apache/spark/sql/jdbc/JdbcUtils.scala | 52 - .../scala/org/apache/spark/sql/jdbc/jdbc.scala | 250 .../org/apache/spark/sql/json/InferSchema.scala | 207 .../apache/spark/sql/json/JSONRelation.scala| 203 --- .../spark/sql/json/JacksonGenerator.scala | 135 -- .../apache/spark/sql/json/JacksonParser.scala | 228 .../apache/spark/sql/json/JacksonUtils.scala| 32 - .../apache/spark/sql/metric/SQLMetrics.scala
[09/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala deleted file mode 100644 index 975fec1..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.util.{Map = JMap} - -import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap} - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.hadoop.api.ReadSupport.ReadContext -import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} -import org.apache.parquet.io.api.RecordMaterializer -import org.apache.parquet.schema.MessageType - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.StructType - -private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { - override def prepareForRead( - conf: Configuration, - keyValueMetaData: JMap[String, String], - fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[InternalRow] = { -log.debug(sPreparing for read Parquet file with message type: $fileSchema) - -val toCatalyst = new CatalystSchemaConverter(conf) -val parquetRequestedSchema = readContext.getRequestedSchema - -val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata = -metadata - // First tries to read requested schema, which may result from projections - .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) - // If not available, tries to read Catalyst schema from file metadata. It's only - // available if the target file is written by Spark SQL. - .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) - }.map(StructType.fromString).getOrElse { -logDebug(Catalyst schema not available, falling back to Parquet schema) -toCatalyst.convert(parquetRequestedSchema) - } - -logDebug(sCatalyst schema used to read Parquet files: $catalystRequestedSchema) -new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) - } - - override def init(context: InitContext): ReadContext = { -val conf = context.getConfiguration - -// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst -// schema of this file from its the metadata. -val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) - -// Optional schema of requested columns, in the form of a string serialized from a Catalyst -// `StructType` containing all requested columns. -val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - -// Below we construct a Parquet schema containing all requested columns. This schema tells -// Parquet which columns to read. -// -// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise, -// we have to fallback to the full file schema which contains all columns in the file. -// Obviously this may waste IO bandwidth since it may read more columns than requested. -// -// Two things to note: -// -// 1. It's possible that some requested columns don't exist in the target Parquet file. For -//example, in the case of schema merging, the globally merged schema may contain extra -//columns gathered from other Parquet files. These columns will be simply filled with nulls -//when actually reading the target Parquet file. -// -// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to -//Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to -//
[12/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala new file mode 100644 index 000..3542dfb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.math.{BigDecimal, BigInteger} +import java.nio.ByteOrder + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.parquet.column.Dictionary +import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some + * corresponding parent container. For example, a converter for a `StructType` field may set + * converted values to a [[MutableRow]]; or a converter for array elements may append converted + * values to an [[ArrayBuffer]]. + */ +private[parquet] trait ParentContainerUpdater { + def set(value: Any): Unit = () + def setBoolean(value: Boolean): Unit = set(value) + def setByte(value: Byte): Unit = set(value) + def setShort(value: Short): Unit = set(value) + def setInt(value: Int): Unit = set(value) + def setLong(value: Long): Unit = set(value) + def setFloat(value: Float): Unit = set(value) + def setDouble(value: Double): Unit = set(value) +} + +/** A no-op updater used for root converter (who doesn't have a parent). */ +private[parquet] object NoopUpdater extends ParentContainerUpdater + +/** + * A [[CatalystRowConverter]] is used to convert Parquet structs into Spark SQL [[InternalRow]]s. + * Since any Parquet record is also a struct, this converter can also be used as root converter. + * + * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have + * any parent container. + * + * @param parquetType Parquet schema of Parquet records + * @param catalystType Spark SQL schema that corresponds to the Parquet record type + * @param updater An updater which propagates converted field values to the parent container + */ +private[parquet] class CatalystRowConverter( +parquetType: GroupType, +catalystType: StructType, +updater: ParentContainerUpdater) + extends GroupConverter { + + /** + * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates + * converted filed values to the `ordinal`-th cell in `currentRow`. + */ + private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { +override def set(value: Any): Unit = row(ordinal) = value +override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) +override def setByte(value: Byte): Unit = row.setByte(ordinal, value) +override def setShort(value: Short): Unit = row.setShort(ordinal, value) +override def setInt(value: Int): Unit = row.setInt(ordinal, value) +override def setLong(value: Long): Unit = row.setLong(ordinal, value) +override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) +override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) + } + + /** + * Represents the converted row object once an entire Parquet record is converted. + * + * @todo Uses [[UnsafeRow]] for better performance. + */ + val currentRow = new
[03/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala deleted file mode 100644 index 92022ff..000 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ /dev/null @@ -1,1172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.json - -import java.io.{File, StringWriter} -import java.sql.{Date, Timestamp} - -import com.fasterxml.jackson.core.JsonFactory -import org.apache.spark.rdd.RDD -import org.scalactic.Tolerance._ - -import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf} -import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} -import org.apache.spark.sql.json.InferSchema.compatibleType -import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils - -class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData { - - protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext - override def sqlContext: SQLContext = ctx // used by SQLTestUtils - - import ctx.sql - import ctx.implicits._ - - test(Type promotion) { -def checkTypePromotion(expected: Any, actual: Any) { - assert(expected.getClass == actual.getClass, -sFailed to promote ${actual.getClass} to ${expected.getClass}.) - assert(expected == actual, -sPromoted value ${actual}(${actual.getClass}) does not equal the expected value + - s${expected}(${expected.getClass}).) -} - -val factory = new JsonFactory() -def enforceCorrectType(value: Any, dataType: DataType): Any = { - val writer = new StringWriter() - val generator = factory.createGenerator(writer) - generator.writeObject(value) - generator.flush() - - val parser = factory.createParser(writer.toString) - parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) -} - -val intNumber: Int = 2147483647 -checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType)) -checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType)) -checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType)) -checkTypePromotion( - Decimal(intNumber), enforceCorrectType(intNumber, DecimalType.SYSTEM_DEFAULT)) - -val longNumber: Long = 9223372036854775807L -checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType)) -checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType)) -checkTypePromotion( - Decimal(longNumber), enforceCorrectType(longNumber, DecimalType.SYSTEM_DEFAULT)) - -val doubleNumber: Double = 1.7976931348623157E308d -checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) - -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)), -enforceCorrectType(intNumber, TimestampType)) -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), -enforceCorrectType(intNumber.toLong, TimestampType)) -val strTime = 2014-09-30 12:34:56 - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), -enforceCorrectType(strTime, TimestampType)) - -val strDate = 2014-10-15 -checkTypePromotion( - DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) - -val ISO8601Time1 = 1970-01-01T01:00:01.0Z -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), -enforceCorrectType(ISO8601Time1, TimestampType)) -checkTypePromotion(DateTimeUtils.millisToDays(3601000), - enforceCorrectType(ISO8601Time1, DateType)) -val ISO8601Time2 = 1970-01-01T02:00:01-01:00 -checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), -
[01/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
Repository: spark Updated Branches: refs/heads/branch-1.5 d251d9ff0 - c1838e430 http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 1a4d41b..392da0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -20,9 +20,37 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} + +// please note that the META-INF/services had to be modified for the test directory for this to work +class DDLSourceLoadSuite extends DataSourceTest { + + test(data sources with the same name) { +intercept[RuntimeException] { + caseInsensitiveContext.read.format(Fluet da Bomb).load() +} + } + + test(load data source from format alias) { +caseInsensitiveContext.read.format(gathering quorum).load().schema == + StructType(Seq(StructField(stringType, StringType, nullable = false))) + } + + test(specify full classname with duplicate formats) { + caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne) + .load().schema == StructType(Seq(StructField(stringType, StringType, nullable = false))) + } + + test(should fail to load ORC without HiveContext) { +intercept[ClassNotFoundException] { + caseInsensitiveContext.read.format(orc).load() +} + } +} + + class FakeSourceOne extends RelationProvider with DataSourceRegister { - def format(): String = Fluet da Bomb + def shortName(): String = Fluet da Bomb override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -35,7 +63,7 @@ class FakeSourceOne extends RelationProvider with DataSourceRegister { class FakeSourceTwo extends RelationProvider with DataSourceRegister { - def format(): String = Fluet da Bomb + def shortName(): String = Fluet da Bomb override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -48,7 +76,7 @@ class FakeSourceTwo extends RelationProvider with DataSourceRegister { class FakeSourceThree extends RelationProvider with DataSourceRegister { - def format(): String = gathering quorum + def shortName(): String = gathering quorum override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -58,28 +86,3 @@ class FakeSourceThree extends RelationProvider with DataSourceRegister { StructType(Seq(StructField(stringType, StringType, nullable = false))) } } -// please note that the META-INF/services had to be modified for the test directory for this to work -class DDLSourceLoadSuite extends DataSourceTest { - - test(data sources with the same name) { -intercept[RuntimeException] { - caseInsensitiveContext.read.format(Fluet da Bomb).load() -} - } - - test(load data source from format alias) { -caseInsensitiveContext.read.format(gathering quorum).load().schema == - StructType(Seq(StructField(stringType, StringType, nullable = false))) - } - - test(specify full classname with duplicate formats) { - caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne) - .load().schema == StructType(Seq(StructField(stringType, StringType, nullable = false))) - } - - test(Loading Orc) { -intercept[ClassNotFoundException] { - caseInsensitiveContext.read.format(orc).load() -} - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 3cbf546..27d1cd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -22,14 +22,39 @@ import org.apache.spark.sql.execution.datasources.ResolvedDataSource class ResolvedDataSourceSuite extends SparkFunSuite { - test(builtin sources) { -assert(ResolvedDataSource.lookupDataSource(jdbc) === - classOf[org.apache.spark.sql.jdbc.DefaultSource]) + test(jdbc) { +assert( + ResolvedDataSource.lookupDataSource(jdbc) === + classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) +assert( +
[06/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java -- diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java b/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java deleted file mode 100644 index 354c9d7..000 --- a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java +++ /dev/null @@ -1,1001 +0,0 @@ -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.spark.sql.parquet.test.avro; -@SuppressWarnings(all) -@org.apache.avro.specific.AvroGenerated -public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse({\type\:\record\,\name\:\ParquetAvroCompat\,\namespace\:\org.apache.spark.sql.parquet.test.avro\,\fields\:[{\name\:\bool_column\,\type\:\boolean\},{\name\:\int_column\,\type\:\int\},{\name\:\long_column\,\type\:\long\},{\name\:\float_column\,\type\:\float\},{\name\:\double_column\,\type\:\double\},{\name\:\binary_column\,\type\:\bytes\},{\name\:\string_column\,\type\:{\type\:\string\,\avro.java.string\:\String\}},{\name\:\maybe_bool_column\,\type\:[\null\,\boolean\]},{\name\:\maybe_int_column\,\type\:[\null\,\int\]},{\name\:\maybe_long_column\,\type\:[\null\,\long\]},{\name\:\maybe_float_column\,\type\:[\null\,\float\]},{\name\:\maybe_double_column\,\type\:[\null\,\double\]},{\name\:\maybe_binary_column\,\type\:[\null\,\bytes\]},{\name\:\maybe_string _column\,\type\:[\null\,{\type\:\string\,\avro.java.string\:\String\}]},{\name\:\strings_column\,\type\:{\type\:\array\,\items\:{\type\:\string\,\avro.java.string\:\String\}}},{\name\:\string_to_int_column\,\type\:{\type\:\map\,\values\:\int\,\avro.java.string\:\String\}},{\name\:\complex_column\,\type\:{\type\:\map\,\values\:{\type\:\array\,\items\:{\type\:\record\,\name\:\Nested\,\fields\:[{\name\:\nested_ints_column\,\type\:{\type\:\array\,\items\:\int\}},{\name\:\nested_string_column\,\type\:{\type\:\string\,\avro.java.string\:\String\}}]}},\avro.java.string\:\String\}}]}); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public boolean bool_column; - @Deprecated public int int_column; - @Deprecated public long long_column; - @Deprecated public float float_column; - @Deprecated public double double_column; - @Deprecated public java.nio.ByteBuffer binary_column; - @Deprecated public java.lang.String string_column; - @Deprecated public java.lang.Boolean maybe_bool_column; - @Deprecated public java.lang.Integer maybe_int_column; - @Deprecated public java.lang.Long maybe_long_column; - @Deprecated public java.lang.Float maybe_float_column; - @Deprecated public java.lang.Double maybe_double_column; - @Deprecated public java.nio.ByteBuffer maybe_binary_column; - @Deprecated public java.lang.String maybe_string_column; - @Deprecated public java.util.Listjava.lang.String strings_column; - @Deprecated public java.util.Mapjava.lang.String,java.lang.Integer string_to_int_column; - @Deprecated public java.util.Mapjava.lang.String,java.util.Listorg.apache.spark.sql.parquet.test.avro.Nested complex_column; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use codenewBuilder()/code. - */ - public ParquetAvroCompat() {} - - /** - * All-args constructor. - */ - public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, java.util.Listjava.lang.String strings_column, java.util.Mapjava.lang.String,java.lang.Integer string_to_int_column, java.util.Mapjava.lang.String,java.util.Listorg.apache.spark.sql.parquet.test.avro.Nested complex_column) { -this.bool_column = bool_column; -this.int_column = int_column; -this.long_column = long_column; -this.float_column = float_column; -this.double_column = double_column; -this.binary_column = binary_column; -this.string_column = string_column; -this.maybe_bool_column = maybe_bool_column; -this.maybe_int_column = maybe_int_column; -this.maybe_long_column = maybe_long_column; -this.maybe_float_column = maybe_float_column; -this.maybe_double_column =
[13/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala new file mode 100644 index 000..8eab6a0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc + +import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException} +import java.util.Properties + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.jdbc.JdbcDialects +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} + +/** + * Data corresponding to one partition of a JDBCRDD. + */ +private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition { + override def index: Int = idx +} + + +private[sql] object JDBCRDD extends Logging { + + /** + * Maps a JDBC type to a Catalyst type. This function is called only when + * the JdbcDialect class corresponding to your database driver returns null. + * + * @param sqlType - A field of java.sql.Types + * @return The Catalyst type corresponding to sqlType. + */ + private def getCatalystType( + sqlType: Int, + precision: Int, + scale: Int, + signed: Boolean): DataType = { +val answer = sqlType match { + // scalastyle:off + case java.sql.Types.ARRAY = null + case java.sql.Types.BIGINT= if (signed) { LongType } else { DecimalType(20,0) } + case java.sql.Types.BINARY= BinaryType + case java.sql.Types.BIT = BooleanType // @see JdbcDialect for quirks + case java.sql.Types.BLOB = BinaryType + case java.sql.Types.BOOLEAN = BooleanType + case java.sql.Types.CHAR = StringType + case java.sql.Types.CLOB = StringType + case java.sql.Types.DATALINK = null + case java.sql.Types.DATE = DateType + case java.sql.Types.DECIMAL +if precision != 0 || scale != 0 = DecimalType.bounded(precision, scale) + case java.sql.Types.DECIMAL = DecimalType.SYSTEM_DEFAULT + case java.sql.Types.DISTINCT = null + case java.sql.Types.DOUBLE= DoubleType + case java.sql.Types.FLOAT = FloatType + case java.sql.Types.INTEGER = if (signed) { IntegerType } else { LongType } + case java.sql.Types.JAVA_OBJECT = null + case java.sql.Types.LONGNVARCHAR = StringType + case java.sql.Types.LONGVARBINARY = BinaryType + case java.sql.Types.LONGVARCHAR = StringType + case java.sql.Types.NCHAR = StringType + case java.sql.Types.NCLOB = StringType + case java.sql.Types.NULL = null + case java.sql.Types.NUMERIC +if precision != 0 || scale != 0 = DecimalType.bounded(precision, scale) + case java.sql.Types.NUMERIC = DecimalType.SYSTEM_DEFAULT + case java.sql.Types.NVARCHAR = StringType + case java.sql.Types.OTHER = null + case java.sql.Types.REAL = DoubleType + case java.sql.Types.REF = StringType + case java.sql.Types.ROWID = LongType + case java.sql.Types.SMALLINT = IntegerType + case java.sql.Types.SQLXML= StringType + case java.sql.Types.STRUCT= StringType + case java.sql.Types.TIME = TimestampType + case java.sql.Types.TIMESTAMP = TimestampType + case java.sql.Types.TINYINT = IntegerType + case
[05/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala new file mode 100644 index 000..6b62c9a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext + +trait TestJsonData { + + protected def ctx: SQLContext + + def primitiveFieldAndType: RDD[String] = +ctx.sparkContext.parallelize( + {string:this is a simple string., + integer:10, + long:21474836470, + bigInteger:92233720368547758070, + double:1.7976931348623157E308, + boolean:true, + null:null + } :: Nil) + + def primitiveFieldValueTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {num_num_1:11, num_num_2:null, num_num_3: 1.1, + num_bool:true, num_str:13.1, str_bool:str1} :: + {num_num_1:null, num_num_2:21474836470.9, num_num_3: null, + num_bool:12, num_str:null, str_bool:true} :: + {num_num_1:21474836470, num_num_2:92233720368547758070, num_num_3: 100, + num_bool:false, num_str:str1, str_bool:false} :: + {num_num_1:21474836570, num_num_2:1.1, num_num_3: 21474836470, + num_bool:null, num_str:92233720368547758070, str_bool:null} :: Nil) + + def jsonNullStruct: RDD[String] = +ctx.sparkContext.parallelize( + {nullstr:,ip:27.31.100.29,headers:{Host:1.abc.com,Charset:UTF-8}} :: +{nullstr:,ip:27.31.100.29,headers:{}} :: +{nullstr:,ip:27.31.100.29,headers:} :: +{nullstr:null,ip:27.31.100.29,headers:null} :: Nil) + + def complexFieldValueTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {num_struct:11, str_array:[1, 2, 3], + array:[], struct_array:[], struct: {}} :: + {num_struct:{field:false}, str_array:null, + array:null, struct_array:{}, struct: null} :: + {num_struct:null, str_array:str, + array:[4, 5, 6], struct_array:[7, 8, 9], struct: {field:null}} :: + {num_struct:{}, str_array:[str1, str2, 33], + array:[7], struct_array:{field: true}, struct: {field: str}} :: Nil) + + def arrayElementTypeConflict: RDD[String] = +ctx.sparkContext.parallelize( + {array1: [1, 1.1, true, null, [], {}, [2,3,4], {field:str}], + array2: [{field:214748364700}, {field:1}]} :: + {array3: [{field:str}, {field:1}]} :: + {array3: [1, 2, 3]} :: Nil) + + def missingFields: RDD[String] = +ctx.sparkContext.parallelize( + {a:true} :: + {b:21474836470} :: + {c:[33, 44]} :: + {d:{field:true}} :: + {e:str} :: Nil) + + def complexFieldAndType1: RDD[String] = +ctx.sparkContext.parallelize( + {struct:{field1: true, field2: 92233720368547758070}, + structWithArrayFields:{field1:[4, 5, 6], field2:[str1, str2]}, + arrayOfString:[str1, str2], + arrayOfInteger:[1, 2147483647, -2147483648], + arrayOfLong:[21474836470, 9223372036854775807, -9223372036854775808], + arrayOfBigInteger:[922337203685477580700, -922337203685477580800], + arrayOfDouble:[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], + arrayOfBoolean:[true, false, true], + arrayOfNull:[null, null, null, null], + arrayOfStruct:[{field1: true, field2: str1}, {field1: false}, {field3: null}], + arrayOfArray1:[[1, 2, 3], [str1, str2]], + arrayOfArray2:[[1, 2, 3], [1.1, 2.1, 3.1]] + } :: Nil) + + def complexFieldAndType2: RDD[String] = +ctx.sparkContext.parallelize( + {arrayOfStruct:[{field1: true, field2: str1}, {field1: false}, {field3: null}], + complexArrayOfStruct: [ + { +field1: [ +
[07/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala deleted file mode 100644 index 1ba50b9..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.ui - -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.mutable - -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue} - -/** - * A graph used for storing information of an executionPlan of DataFrame. - * - * Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the - * SparkPlan tree, and each edge represents a parent-child relationship between two nodes. - */ -private[ui] case class SparkPlanGraph( -nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { - - def makeDotFile(metrics: Map[Long, Any]): String = { -val dotFile = new StringBuilder -dotFile.append(digraph G {\n) -nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n)) -edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n)) -dotFile.append(}) -dotFile.toString() - } -} - -private[sql] object SparkPlanGraph { - - /** - * Build a SparkPlanGraph from the root of a SparkPlan tree. - */ - def apply(plan: SparkPlan): SparkPlanGraph = { -val nodeIdGenerator = new AtomicLong(0) -val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]() -val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]() -buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges) -new SparkPlanGraph(nodes, edges) - } - - private def buildSparkPlanGraphNode( - plan: SparkPlan, - nodeIdGenerator: AtomicLong, - nodes: mutable.ArrayBuffer[SparkPlanGraphNode], - edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = { -val metrics = plan.metrics.toSeq.map { case (key, metric) = - SQLPlanMetric(metric.name.getOrElse(key), metric.id, -metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]) -} -val node = SparkPlanGraphNode( - nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics) -nodes += node -val childrenNodes = plan.children.map( - child = buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges)) -for (child - childrenNodes) { - edges += SparkPlanGraphEdge(child.id, node.id) -} -node - } -} - -/** - * Represent a node in the SparkPlan tree, along with its metrics. - * - * @param id generated by SparkPlanGraph. There is no duplicate id in a graph - * @param name the name of this SparkPlan node - * @param metrics metrics that this SparkPlan node will track - */ -private[ui] case class SparkPlanGraphNode( -id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) { - - def makeDotNode(metricsValue: Map[Long, Any]): String = { -val values = { - for (metric - metrics; - value - metricsValue.get(metric.accumulatorId)) yield { -metric.name + : + value - } -} -val label = if (values.isEmpty) { -name - } else { -// If there are metrics, display all metrics in a separate line. We should use an escaped -// \n here to follow the dot syntax. -// -// Note: whitespace between two \ns is to create an empty line between the name of -// SparkPlan and metrics. If removing it, it won't display the empty line in UI. -name + \\n \\n + values.mkString(\\n) - } -s $id [label=$label]; - } -} - -/** - * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child - * node id. - */ -private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) { - - def makeDotEdge: String = s $fromId-$toId;\n -}
[01/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
Repository: spark Updated Branches: refs/heads/master 0fe66744f - 40ed2af58 http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 1a4d41b..392da0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -20,9 +20,37 @@ package org.apache.spark.sql.sources import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} + +// please note that the META-INF/services had to be modified for the test directory for this to work +class DDLSourceLoadSuite extends DataSourceTest { + + test(data sources with the same name) { +intercept[RuntimeException] { + caseInsensitiveContext.read.format(Fluet da Bomb).load() +} + } + + test(load data source from format alias) { +caseInsensitiveContext.read.format(gathering quorum).load().schema == + StructType(Seq(StructField(stringType, StringType, nullable = false))) + } + + test(specify full classname with duplicate formats) { + caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne) + .load().schema == StructType(Seq(StructField(stringType, StringType, nullable = false))) + } + + test(should fail to load ORC without HiveContext) { +intercept[ClassNotFoundException] { + caseInsensitiveContext.read.format(orc).load() +} + } +} + + class FakeSourceOne extends RelationProvider with DataSourceRegister { - def format(): String = Fluet da Bomb + def shortName(): String = Fluet da Bomb override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -35,7 +63,7 @@ class FakeSourceOne extends RelationProvider with DataSourceRegister { class FakeSourceTwo extends RelationProvider with DataSourceRegister { - def format(): String = Fluet da Bomb + def shortName(): String = Fluet da Bomb override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -48,7 +76,7 @@ class FakeSourceTwo extends RelationProvider with DataSourceRegister { class FakeSourceThree extends RelationProvider with DataSourceRegister { - def format(): String = gathering quorum + def shortName(): String = gathering quorum override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation = new BaseRelation { @@ -58,28 +86,3 @@ class FakeSourceThree extends RelationProvider with DataSourceRegister { StructType(Seq(StructField(stringType, StringType, nullable = false))) } } -// please note that the META-INF/services had to be modified for the test directory for this to work -class DDLSourceLoadSuite extends DataSourceTest { - - test(data sources with the same name) { -intercept[RuntimeException] { - caseInsensitiveContext.read.format(Fluet da Bomb).load() -} - } - - test(load data source from format alias) { -caseInsensitiveContext.read.format(gathering quorum).load().schema == - StructType(Seq(StructField(stringType, StringType, nullable = false))) - } - - test(specify full classname with duplicate formats) { - caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne) - .load().schema == StructType(Seq(StructField(stringType, StringType, nullable = false))) - } - - test(Loading Orc) { -intercept[ClassNotFoundException] { - caseInsensitiveContext.read.format(orc).load() -} - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 3cbf546..27d1cd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -22,14 +22,39 @@ import org.apache.spark.sql.execution.datasources.ResolvedDataSource class ResolvedDataSourceSuite extends SparkFunSuite { - test(builtin sources) { -assert(ResolvedDataSource.lookupDataSource(jdbc) === - classOf[org.apache.spark.sql.jdbc.DefaultSource]) + test(jdbc) { +assert( + ResolvedDataSource.lookupDataSource(jdbc) === + classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource]) +assert( +