spark git commit: [SPARK-21273][SQL] Propagate logical plan stats using visitor pattern and mixin
Repository: spark Updated Branches: refs/heads/master 61b5df567 -> b1d719e7c [SPARK-21273][SQL] Propagate logical plan stats using visitor pattern and mixin ## What changes were proposed in this pull request? We currently implement statistics propagation directly in logical plan. Given we already have two different implementations, it'd make sense to actually decouple the two and add stats propagation using mixin. This would reduce the coupling between logical plan and statistics handling. This can also be a powerful pattern in the future to add additional properties (e.g. constraints). ## How was this patch tested? Should be covered by existing test cases. Author: Reynold XinCloses #18479 from rxin/stats-trait. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1d719e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1d719e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1d719e7 Branch: refs/heads/master Commit: b1d719e7c9faeb5661a7e712b3ecefca56bf356f Parents: 61b5df5 Author: Reynold Xin Authored: Fri Jun 30 21:10:23 2017 -0700 Committer: gatorsmile Committed: Fri Jun 30 21:10:23 2017 -0700 -- .../spark/sql/catalyst/catalog/interface.scala | 2 +- .../catalyst/plans/logical/LocalRelation.scala | 5 +- .../catalyst/plans/logical/LogicalPlan.scala| 61 +-- .../plans/logical/LogicalPlanVisitor.scala | 87 ++ .../plans/logical/basicLogicalOperators.scala | 128 +-- .../sql/catalyst/plans/logical/hints.scala | 5 - .../statsEstimation/BasicStatsPlanVisitor.scala | 82 ++ .../statsEstimation/LogicalPlanStats.scala | 50 ++ .../SizeInBytesOnlyStatsPlanVisitor.scala | 163 +++ .../BasicStatsEstimationSuite.scala | 44 - .../StatsEstimationTestBase.scala | 2 +- .../spark/sql/execution/ExistingRDD.scala | 4 +- .../execution/columnar/InMemoryRelation.scala | 2 +- .../execution/datasources/LogicalRelation.scala | 7 +- .../spark/sql/execution/streaming/memory.scala | 3 +- .../PruneFileSourcePartitionsSuite.scala| 2 +- 16 files changed, 409 insertions(+), 238 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index da50b0e..9531456 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -438,7 +438,7 @@ case class CatalogRelation( case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) }) - override def computeStats: Statistics = { + override def computeStats(): Statistics = { // For data source tables, we will create a `LogicalRelation` and won't call this method, for // hive serde tables, we will always generate a statistics. // TODO: unify the table stats generation. http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index dc2add6..1c986fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -66,9 +66,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } } - override def computeStats: Statistics = -Statistics(sizeInBytes = - output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length) + override def computeStats(): Statistics = +Statistics(sizeInBytes = output.map(n => BigInt(n.dataType.defaultSize)).sum * data.length) def toSQL(inlineTableName: String): String = { require(data.nonEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/b1d719e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
spark git commit: [SPARK-21127][SQL] Update statistics after data changing commands
Repository: spark Updated Branches: refs/heads/master 4eb41879c -> 61b5df567 [SPARK-21127][SQL] Update statistics after data changing commands ## What changes were proposed in this pull request? Update stats after the following data changing commands: - InsertIntoHadoopFsRelationCommand - InsertIntoHiveTable - LoadDataCommand - TruncateTableCommand - AlterTableSetLocationCommand - AlterTableDropPartitionCommand ## How was this patch tested? Added new test cases. Author: wangzhenhuaAuthor: Zhenhua Wang Closes #18334 from wzhfy/changeStatsForOperation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61b5df56 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61b5df56 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61b5df56 Branch: refs/heads/master Commit: 61b5df567eb8ae0df4059cb0e334316fff462de9 Parents: 4eb4187 Author: wangzhenhua Authored: Sat Jul 1 10:01:44 2017 +0800 Committer: Wenchen Fan Committed: Sat Jul 1 10:01:44 2017 +0800 -- .../org/apache/spark/sql/internal/SQLConf.scala | 10 + .../sql/execution/command/CommandUtils.scala| 17 +- .../spark/sql/execution/command/ddl.scala | 15 +- .../spark/sql/StatisticsCollectionSuite.scala | 77 +--- .../apache/spark/sql/hive/StatisticsSuite.scala | 187 --- 5 files changed, 207 insertions(+), 99 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c641e4d..25152f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -774,6 +774,14 @@ object SQLConf { .doubleConf .createWithDefault(0.05) + val AUTO_UPDATE_SIZE = +buildConf("spark.sql.statistics.autoUpdate.size") + .doc("Enables automatic update for table size once table's data is changed. Note that if " + +"the total number of files of the table is very large, this can be expensive and slow " + +"down data change commands.") + .booleanConf + .createWithDefault(false) + val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") @@ -1083,6 +1091,8 @@ class SQLConf extends Serializable with Logging { def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + def autoUpdateSize: Boolean = getConf(SQLConf.AUTO_UPDATE_SIZE) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 9239760..fce12cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -36,7 +36,14 @@ object CommandUtils extends Logging { def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { if (table.stats.nonEmpty) { val catalog = sparkSession.sessionState.catalog - catalog.alterTableStats(table.identifier, None) + if (sparkSession.sessionState.conf.autoUpdateSize) { +val newTable = catalog.getTableMetadata(table.identifier) +val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) +val newStats = CatalogStatistics(sizeInBytes = newSize) +catalog.alterTableStats(table.identifier, Some(newStats)) + } else { +catalog.alterTableStats(table.identifier, None) + } } } @@ -84,7 +91,9 @@ object CommandUtils extends Logging { size } -locationUri.map { p => +val startTime = System.nanoTime() +logInfo(s"Starting to calculate the total file size under path $locationUri.") +val size = locationUri.map { p => val path = new Path(p) try { val fs = path.getFileSystem(sessionState.newHadoopConf()) @@ -97,6 +106,10 @@ object CommandUtils extends Logging { 0L } }.getOrElse(0L) +val durationInMs
spark git commit: [SPARK-17528][SQL] data should be copied properly before saving into InternalRow
Repository: spark Updated Branches: refs/heads/master fd1325522 -> 4eb41879c [SPARK-17528][SQL] data should be copied properly before saving into InternalRow ## What changes were proposed in this pull request? For performance reasons, `UnsafeRow.getString`, `getStruct`, etc. return a "pointer" that points to a memory region of this unsafe row. This makes the unsafe projection a little dangerous, because all of its output rows share one instance. When we implement SQL operators, we should be careful to not cache the input rows because they may be produced by unsafe projection from child operator and thus its content may change overtime. However, when we updating values of InternalRow(e.g. in mutable projection and safe projection), we only copy UTF8String, we should also copy InternalRow, ArrayData and MapData. This PR fixes this, and also fixes the copy of vairous InternalRow, ArrayData and MapData implementations. ## How was this patch tested? new regression tests Author: Wenchen FanCloses #18483 from cloud-fan/fix-copy. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4eb41879 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4eb41879 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4eb41879 Branch: refs/heads/master Commit: 4eb41879ce774dec1d16b2281ab1fbf41f9d418a Parents: fd13255 Author: Wenchen Fan Authored: Sat Jul 1 09:25:29 2017 +0800 Committer: Wenchen Fan Committed: Sat Jul 1 09:25:29 2017 +0800 -- .../apache/spark/unsafe/types/UTF8String.java | 6 ++ .../apache/spark/sql/catalyst/InternalRow.scala | 27 - .../spark/sql/catalyst/expressions/Cast.scala | 2 +- .../expressions/SpecificInternalRow.scala | 12 --- .../expressions/aggregate/collect.scala | 2 +- .../expressions/aggregate/interfaces.scala | 6 ++ .../expressions/codegen/CodeGenerator.scala | 6 +- .../codegen/GenerateSafeProjection.scala| 2 - .../spark/sql/catalyst/expressions/rows.scala | 23 ++-- .../sql/catalyst/util/GenericArrayData.scala| 10 +- .../scala/org/apache/spark/sql/RowTest.scala| 4 - .../sql/catalyst/expressions/MapDataSuite.scala | 57 -- .../codegen/GeneratedProjectionSuite.scala | 36 +++ .../sql/catalyst/util/ComplexDataSuite.scala| 107 +++ .../sql/execution/vectorized/ColumnarBatch.java | 2 +- .../SortBasedAggregationIterator.scala | 15 +-- .../columnar/GenerateColumnAccessor.scala | 1 - .../execution/window/AggregateProcessor.scala | 7 +- 18 files changed, 212 insertions(+), 113 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4eb41879/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 40b9fc9..9de4ca7 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -1088,6 +1088,12 @@ public final class UTF8String implements Comparable, Externalizable, return fromBytes(getBytes()); } + public UTF8String copy() { +byte[] bytes = new byte[numBytes]; +copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes); +return fromBytes(bytes); + } + @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); http://git-wip-us.apache.org/repos/asf/spark/blob/4eb41879/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index 256f64e..2911064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types.{DataType, Decimal, StructType} +import org.apache.spark.unsafe.types.UTF8String /** * An abstract class for row used internally in Spark SQL, which only contains the columns as @@ -33,6 +35,10 @@ abstract class InternalRow extends SpecializedGetters with Serializable { def setNullAt(i: Int): Unit + /** + * Updates
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.2.0-rc6 [created] a2c7b2133 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.2.1-SNAPSHOT
Preparing development version 2.2.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85fddf40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85fddf40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85fddf40 Branch: refs/heads/branch-2.2 Commit: 85fddf406429dac00ddfb2e6c30870da450455bd Parents: a2c7b21 Author: Patrick WendellAuthored: Fri Jun 30 15:54:39 2017 -0700 Committer: Patrick Wendell Committed: Fri Jun 30 15:54:39 2017 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 38 files changed, 39 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85fddf40/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 879c1f8..cfa49b9 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.2.0 +Version: 2.2.1 Title: R Frontend for Apache Spark Description: The SparkR package provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/85fddf40/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 3a7003f..da7b0c9 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.2.0 +2.2.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/85fddf40/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 5e9ffd1..7577253 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.2.0 +2.2.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/85fddf40/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index c3e10d1..558864a 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.2.0 +2.2.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/85fddf40/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index e66a8b4..de66617 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.2.0 +2.2.1-SNAPSHOT ../../pom.xml
[1/2] spark git commit: Preparing Spark release v2.2.0-rc6
Repository: spark Updated Branches: refs/heads/branch-2.2 29a0be2b3 -> 85fddf406 Preparing Spark release v2.2.0-rc6 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2c7b213 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2c7b213 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2c7b213 Branch: refs/heads/branch-2.2 Commit: a2c7b2133cfee7fa9abfaa2bfbfb637155466783 Parents: 29a0be2 Author: Patrick WendellAuthored: Fri Jun 30 15:54:34 2017 -0700 Committer: Patrick Wendell Committed: Fri Jun 30 15:54:34 2017 -0700 -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/mesos/pom.xml | 2 +- resource-managers/yarn/pom.xml| 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 38 files changed, 39 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7b213/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index cfa49b9..879c1f8 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.2.1 +Version: 2.2.0 Title: R Frontend for Apache Spark Description: The SparkR package provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7b213/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index da7b0c9..3a7003f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.2.1-SNAPSHOT +2.2.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7b213/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 7577253..5e9ffd1 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.2.1-SNAPSHOT +2.2.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7b213/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 558864a..c3e10d1 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.2.1-SNAPSHOT +2.2.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/a2c7b213/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index de66617..e66a8b4 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark
spark git commit: [SPARK-21052][SQL][FOLLOW-UP] Add hash map metrics to join
Repository: spark Updated Branches: refs/heads/master eed9c4ef8 -> fd1325522 [SPARK-21052][SQL][FOLLOW-UP] Add hash map metrics to join ## What changes were proposed in this pull request? Remove `numHashCollisions` in `BytesToBytesMap`. And change `getAverageProbesPerLookup()` to `getAverageProbesPerLookup` as suggested. ## How was this patch tested? Existing tests. Author: Liang-Chi HsiehCloses #18480 from viirya/SPARK-21052-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd132552 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd132552 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd132552 Branch: refs/heads/master Commit: fd1325522549937232f37215db53d6478f48644c Parents: eed9c4e Author: Liang-Chi Hsieh Authored: Fri Jun 30 15:11:27 2017 -0700 Committer: gatorsmile Committed: Fri Jun 30 15:11:27 2017 -0700 -- .../spark/unsafe/map/BytesToBytesMap.java | 33 .../spark/sql/execution/joins/HashJoin.scala| 2 +- .../sql/execution/joins/HashedRelation.scala| 8 ++--- 3 files changed, 5 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd132552/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 4bef21b..3b6200e 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -160,14 +160,10 @@ public final class BytesToBytesMap extends MemoryConsumer { private final boolean enablePerfMetrics; - private long timeSpentResizingNs = 0; - private long numProbes = 0; private long numKeyLookups = 0; - private long numHashCollisions = 0; - private long peakMemoryUsedBytes = 0L; private final int initialCapacity; @@ -489,10 +485,6 @@ public final class BytesToBytesMap extends MemoryConsumer { ); if (areEqual) { return; -} else { - if (enablePerfMetrics) { -numHashCollisions++; - } } } } @@ -860,16 +852,6 @@ public final class BytesToBytesMap extends MemoryConsumer { } /** - * Returns the total amount of time spent resizing this map (in nanoseconds). - */ - public long getTimeSpentResizingNs() { -if (!enablePerfMetrics) { - throw new IllegalStateException(); -} -return timeSpentResizingNs; - } - - /** * Returns the average number of probes per key lookup. */ public double getAverageProbesPerLookup() { @@ -879,13 +861,6 @@ public final class BytesToBytesMap extends MemoryConsumer { return (1.0 * numProbes) / numKeyLookups; } - public long getNumHashCollisions() { -if (!enablePerfMetrics) { - throw new IllegalStateException(); -} -return numHashCollisions; - } - @VisibleForTesting public int getNumDataPages() { return dataPages.size(); @@ -923,10 +898,6 @@ public final class BytesToBytesMap extends MemoryConsumer { void growAndRehash() { assert(longArray != null); -long resizeStartTime = -1; -if (enablePerfMetrics) { - resizeStartTime = System.nanoTime(); -} // Store references to the old data structures to be used when we re-hash final LongArray oldLongArray = longArray; final int oldCapacity = (int) oldLongArray.size() / 2; @@ -951,9 +922,5 @@ public final class BytesToBytesMap extends MemoryConsumer { longArray.set(newPos * 2 + 1, hashcode); } freeArray(oldLongArray); - -if (enablePerfMetrics) { - timeSpentResizingNs += System.nanoTime() - resizeStartTime; -} } } http://git-wip-us.apache.org/repos/asf/spark/blob/fd132552/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index b09edf3..0396168 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -215,7 +215,7 @@ trait HashJoin { // At the end of the task, we update the avg hash probe. TaskContext.get().addTaskCompletionListener(_ => - avgHashProbe.set(hashed.getAverageProbesPerLookup())) + avgHashProbe.set(hashed.getAverageProbesPerLookup))
spark git commit: [SPARK-21129][SQL] Arguments of SQL function call should not be named expressions
Repository: spark Updated Branches: refs/heads/branch-2.2 8b08fd06c -> 29a0be2b3 [SPARK-21129][SQL] Arguments of SQL function call should not be named expressions ### What changes were proposed in this pull request? Function argument should not be named expressions. It could cause two issues: - Misleading error message - Unexpected query results when the column name is `distinct`, which is not a reserved word in our parser. ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: cannot resolve '`distinct`' given input columns: [c1, c2]; line 1 pos 26; 'Project [unresolvedalias('count(c1#30, 'distinct), None)] +- SubqueryAlias t1 +- CatalogRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#30, c2#31] ``` After the fix, the error message becomes ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: extraneous input 'c2' expecting {')', ',', '.', '[', 'OR', 'AND', 'IN', NOT, 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, '+', '-', '*', '/', '%', 'DIV', '&', '|', '||', '^'}(line 1, pos 35) == SQL == select count(distinct c1, distinct c2) from t1 ---^^^ ``` ### How was this patch tested? Added a test case to parser suite. Author: Xiao LiAuthor: gatorsmile Closes #18338 from gatorsmile/parserDistinctAggFunc. (cherry picked from commit eed9c4ef859fdb75a816a3e0ce2d593b34b23444) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29a0be2b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29a0be2b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29a0be2b Branch: refs/heads/branch-2.2 Commit: 29a0be2b3d42bfe991f47725f077892918731e08 Parents: 8b08fd0 Author: Xiao Li Authored: Fri Jun 30 14:23:56 2017 -0700 Committer: gatorsmile Committed: Fri Jun 30 14:24:05 2017 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../apache/spark/sql/catalyst/dsl/package.scala | 1 + .../spark/sql/catalyst/parser/AstBuilder.scala | 9 +- .../catalyst/parser/ExpressionParserSuite.scala | 6 ++-- .../sql/catalyst/parser/PlanParserSuite.scala | 6 .../test/resources/sql-tests/inputs/struct.sql | 7 + .../resources/sql-tests/results/struct.sql.out | 32 +++- 7 files changed, 59 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29a0be2b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 31b1c67..499f27f 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -552,6 +552,7 @@ primaryExpression | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase | CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase | CAST '(' expression AS dataType ')' #cast +| STRUCT '(' (argument+=namedExpression (',' argument+=namedExpression)*)? ')' #struct | FIRST '(' expression (IGNORE NULLS)? ')' #first | LAST '(' expression (IGNORE NULLS)? ')' #last | constant #constantDefault @@ -559,7 +560,7 @@ primaryExpression | qualifiedName '.' ASTERISK #star | '(' namedExpression (',' namedExpression)+ ')' #rowConstructor | '(' query ')' #subqueryExpression -| qualifiedName '(' (setQuantifier? namedExpression (',' namedExpression)*)? ')' +| qualifiedName '(' (setQuantifier? argument+=expression (',' argument+=expression)*)? ')' (OVER windowSpec)? #functionCall | value=primaryExpression '[' index=valueExpression ']' #subscript | identifier #columnReference
spark git commit: [SPARK-21129][SQL] Arguments of SQL function call should not be named expressions
Repository: spark Updated Branches: refs/heads/master 1fe08d62f -> eed9c4ef8 [SPARK-21129][SQL] Arguments of SQL function call should not be named expressions ### What changes were proposed in this pull request? Function argument should not be named expressions. It could cause two issues: - Misleading error message - Unexpected query results when the column name is `distinct`, which is not a reserved word in our parser. ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: cannot resolve '`distinct`' given input columns: [c1, c2]; line 1 pos 26; 'Project [unresolvedalias('count(c1#30, 'distinct), None)] +- SubqueryAlias t1 +- CatalogRelation `default`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#30, c2#31] ``` After the fix, the error message becomes ``` spark-sql> select count(distinct c1, distinct c2) from t1; Error in query: extraneous input 'c2' expecting {')', ',', '.', '[', 'OR', 'AND', 'IN', NOT, 'BETWEEN', 'LIKE', RLIKE, 'IS', EQ, '<=>', '<>', '!=', '<', LTE, '>', GTE, '+', '-', '*', '/', '%', 'DIV', '&', '|', '||', '^'}(line 1, pos 35) == SQL == select count(distinct c1, distinct c2) from t1 ---^^^ ``` ### How was this patch tested? Added a test case to parser suite. Author: Xiao LiAuthor: gatorsmile Closes #18338 from gatorsmile/parserDistinctAggFunc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eed9c4ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eed9c4ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eed9c4ef Branch: refs/heads/master Commit: eed9c4ef859fdb75a816a3e0ce2d593b34b23444 Parents: 1fe08d6 Author: Xiao Li Authored: Fri Jun 30 14:23:56 2017 -0700 Committer: gatorsmile Committed: Fri Jun 30 14:23:56 2017 -0700 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../apache/spark/sql/catalyst/dsl/package.scala | 1 + .../spark/sql/catalyst/parser/AstBuilder.scala | 9 +- .../catalyst/parser/ExpressionParserSuite.scala | 6 ++-- .../sql/catalyst/parser/PlanParserSuite.scala | 6 .../test/resources/sql-tests/inputs/struct.sql | 7 + .../resources/sql-tests/results/struct.sql.out | 32 +++- 7 files changed, 59 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eed9c4ef/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 9456031..7ffa150 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -561,6 +561,7 @@ primaryExpression | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase | CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase | CAST '(' expression AS dataType ')' #cast +| STRUCT '(' (argument+=namedExpression (',' argument+=namedExpression)*)? ')' #struct | FIRST '(' expression (IGNORE NULLS)? ')' #first | LAST '(' expression (IGNORE NULLS)? ')' #last | POSITION '(' substr=valueExpression IN str=valueExpression ')' #position @@ -569,7 +570,7 @@ primaryExpression | qualifiedName '.' ASTERISK #star | '(' namedExpression (',' namedExpression)+ ')' #rowConstructor | '(' query ')' #subqueryExpression -| qualifiedName '(' (setQuantifier? namedExpression (',' namedExpression)*)? ')' +| qualifiedName '(' (setQuantifier? argument+=expression (',' argument+=expression)*)? ')' (OVER windowSpec)? #functionCall | value=primaryExpression '[' index=valueExpression ']' #subscript | identifier #columnReference http://git-wip-us.apache.org/repos/asf/spark/blob/eed9c4ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
spark git commit: [SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue.
Repository: spark Updated Branches: refs/heads/master 528c9281a -> 1fe08d62f [SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue. # What issue does this PR address ? Jira:https://issues.apache.org/jira/browse/SPARK-21223 fix the Thread-safety issue in FsHistoryProvider Currently, Spark HistoryServer use a HashMap named fileToAppInfo in class FsHistoryProvider to store the map of eventlog path and attemptInfo. When use ThreadPool to Replay the log files in the list and merge the list of old applications with new ones, multi thread may update fileToAppInfo at the same time, which may cause Thread-safety issues, such as falling into an infinite loop because of calling resize func of the hashtable. Author: æ¾æ西Closes #18430 from zenglinxi0615/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fe08d62 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fe08d62 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fe08d62 Branch: refs/heads/master Commit: 1fe08d62f022e12f2f0161af5d8f9eac51baf1b9 Parents: 528c928 Author: æ¾æ西 Authored: Fri Jun 30 19:28:43 2017 +0100 Committer: Sean Owen Committed: Fri Jun 30 19:28:43 2017 +0100 -- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1fe08d62/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d05ca14..b2a50bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.history import java.io.{FileNotFoundException, IOException, OutputStream} import java.util.UUID -import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable @@ -122,7 +122,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() - val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]() + val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() // List of application logs to be deleted by event log cleaner. private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] @@ -321,7 +321,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + val fileInfo = fileToAppInfo.get(entry.getPath()) + val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to @@ -475,7 +476,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) fileStatus.getLen(), appListener.appSparkVersion.getOrElse("") ) -fileToAppInfo(logPath) = attemptInfo +fileToAppInfo.put(logPath, attemptInfo) logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo") Some(attemptInfo) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [ML] Fix scala-2.10 build failure of GeneralizedLinearRegressionSuite.
Repository: spark Updated Branches: refs/heads/master 3c2fc19d4 -> 528c9281a [ML] Fix scala-2.10 build failure of GeneralizedLinearRegressionSuite. ## What changes were proposed in this pull request? Fix scala-2.10 build failure of ```GeneralizedLinearRegressionSuite```. ## How was this patch tested? Build with scala-2.10. Author: Yanbo LiangCloses #18489 from yanboliang/glr. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/528c9281 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/528c9281 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/528c9281 Branch: refs/heads/master Commit: 528c9281aecc49e9bff204dd303962c705c6f237 Parents: 3c2fc19 Author: Yanbo Liang Authored: Fri Jun 30 23:25:14 2017 +0800 Committer: Yanbo Liang Committed: Fri Jun 30 23:25:14 2017 +0800 -- .../ml/regression/GeneralizedLinearRegressionSuite.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/528c9281/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index cfaa573..83f1344 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -1075,7 +1075,7 @@ class GeneralizedLinearRegressionSuite val seCoefR = Array(1.23439, 0.9669, 3.56866) val tValsR = Array(0.80297, -0.65737, -0.06017) val pValsR = Array(0.42199, 0.51094, 0.95202) -val dispersionR = 1 +val dispersionR = 1.0 val nullDevianceR = 2.17561 val residualDevianceR = 0.00018 val residualDegreeOfFreedomNullR = 3 @@ -1114,7 +1114,7 @@ class GeneralizedLinearRegressionSuite assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } -assert(summary.dispersion ~== dispersionR absTol 1E-3) +assert(summary.dispersion === dispersionR) assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) assert(summary.deviance ~== residualDevianceR absTol 1E-3) assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) @@ -1190,7 +1190,7 @@ class GeneralizedLinearRegressionSuite val seCoefR = Array(1.16826, 0.41703, 1.96249) val tValsR = Array(-2.46387, 2.12428, -2.32757) val pValsR = Array(0.01374, 0.03365, 0.01993) -val dispersionR = 1 +val dispersionR = 1.0 val nullDevianceR = 22.55853 val residualDevianceR = 9.5622 val residualDegreeOfFreedomNullR = 3 @@ -1229,7 +1229,7 @@ class GeneralizedLinearRegressionSuite assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } -assert(summary.dispersion ~== dispersionR absTol 1E-3) +assert(summary.dispersion === dispersionR) assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) assert(summary.deviance ~== residualDevianceR absTol 1E-3) assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18294][CORE] Implement commit protocol to support `mapred` package's committer
Repository: spark Updated Branches: refs/heads/master 49d767d83 -> 3c2fc19d4 [SPARK-18294][CORE] Implement commit protocol to support `mapred` package's committer ## What changes were proposed in this pull request? This PR makes the following changes: - Implement a new commit protocol `HadoopMapRedCommitProtocol` which support the old `mapred` package's committer; - Refactor SparkHadoopWriter and SparkHadoopMapReduceWriter, now they are combined together, thus we can support write through both mapred and mapreduce API by the new SparkHadoopWriter, a lot of duplicated codes are removed. After this change, it should be pretty easy for us to support the committer from both the new and the old hadoop API at high level. ## How was this patch tested? No major behavior change, passed the existing test cases. Author: Xingbo JiangCloses #18438 from jiangxb1987/SparkHadoopWriter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c2fc19d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c2fc19d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c2fc19d Branch: refs/heads/master Commit: 3c2fc19d478256f8dc0ae7219fdd188030218c07 Parents: 49d767d Author: Xingbo Jiang Authored: Fri Jun 30 20:30:26 2017 +0800 Committer: Wenchen Fan Committed: Fri Jun 30 20:30:26 2017 +0800 -- .../io/HadoopMapRedCommitProtocol.scala | 36 ++ .../internal/io/HadoopWriteConfigUtil.scala | 79 .../io/SparkHadoopMapReduceWriter.scala | 181 - .../spark/internal/io/SparkHadoopWriter.scala | 393 +++ .../org/apache/spark/rdd/PairRDDFunctions.scala | 72 +--- .../spark/rdd/PairRDDFunctionsSuite.scala | 2 +- .../OutputCommitCoordinatorSuite.scala | 35 +- 7 files changed, 461 insertions(+), 337 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c2fc19d/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala new file mode 100644 index 000..ddbd624 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapRedCommitProtocol.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io + +import org.apache.hadoop.mapred._ +import org.apache.hadoop.mapreduce.{TaskAttemptContext => NewTaskAttemptContext} + +/** + * An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter + * (from the old mapred API). + * + * Unlike Hadoop's OutputCommitter, this implementation is serializable. + */ +class HadoopMapRedCommitProtocol(jobId: String, path: String) + extends HadoopMapReduceCommitProtocol(jobId, path) { + + override def setupCommitter(context: NewTaskAttemptContext): OutputCommitter = { +val config = context.getConfiguration.asInstanceOf[JobConf] +config.getOutputCommitter + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3c2fc19d/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala new file mode 100644 index 000..9b987e0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopWriteConfigUtil.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + *
spark git commit: [SPARK-18710][ML] Add offset in GLM
Repository: spark Updated Branches: refs/heads/master 52981715b -> 49d767d83 [SPARK-18710][ML] Add offset in GLM ## What changes were proposed in this pull request? Add support for offset in GLM. This is useful for at least two reasons: 1. Account for exposure: e.g., when modeling the number of accidents, we may need to use miles driven as an offset to access factors on frequency. 2. Test incremental effects of new variables: we can use predictions from the existing model as offset and run a much smaller model on only new variables. This avoids re-estimating the large model with all variables (old + new) and can be very important for efficient large-scaled analysis. ## How was this patch tested? New test. yanboliang srowen felixcheung sethah Author: actuaryzhangCloses #16699 from actuaryzhang/offset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49d767d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49d767d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49d767d8 Branch: refs/heads/master Commit: 49d767d838691fc7d964be2c4349662f5500ff2b Parents: 5298171 Author: actuaryzhang Authored: Fri Jun 30 20:02:15 2017 +0800 Committer: Yanbo Liang Committed: Fri Jun 30 20:02:15 2017 +0800 -- .../org/apache/spark/ml/feature/Instance.scala | 21 + .../IterativelyReweightedLeastSquares.scala | 14 +- .../spark/ml/optim/WeightedLeastSquares.scala | 2 +- .../GeneralizedLinearRegression.scala | 184 -- ...IterativelyReweightedLeastSquaresSuite.scala | 40 +- .../GeneralizedLinearRegressionSuite.scala | 634 +++ 6 files changed, 534 insertions(+), 361 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49d767d8/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index cce3ca4..dd56fbb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -27,3 +27,24 @@ import org.apache.spark.ml.linalg.Vector * @param features The vector of features for this data point. */ private[ml] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Case class that represents an instance of data point with + * label, weight, offset and features. + * This is mainly used in GeneralizedLinearRegression currently. + * + * @param label Label for this data point. + * @param weight The weight of this instance. + * @param offset The offset used for this data point. + * @param features The vector of features for this data point. + */ +private[ml] case class OffsetInstance( +label: Double, +weight: Double, +offset: Double, +features: Vector) { + + /** Converts to an [[Instance]] object by leaving out the offset. */ + def toInstance: Instance = Instance(label, weight, features) + +} http://git-wip-us.apache.org/repos/asf/spark/blob/49d767d8/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index 9c49551..6961b45 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg._ import org.apache.spark.rdd.RDD @@ -43,7 +43,7 @@ private[ml] class IterativelyReweightedLeastSquaresModel( * find M-estimator in robust regression and other optimization problems. * * @param initialModel the initial guess model. - * @param reweightFunc the reweight function which is used to update offsets and weights + * @param reweightFunc the reweight function which is used to update working labels and weights * at each iteration. * @param fitIntercept whether to fit intercept. * @param regParam L2 regularization parameter used by WLS. @@ -57,13 +57,13 @@ private[ml] class IterativelyReweightedLeastSquaresModel( */ private[ml] class IterativelyReweightedLeastSquares( val initialModel: WeightedLeastSquaresModel, -val reweightFunc:
spark git commit: Revert "[SPARK-21258][SQL] Fix WindowExec complex object aggregation with spilling"
Repository: spark Updated Branches: refs/heads/branch-2.1 d995dac1c -> 3ecef2491 Revert "[SPARK-21258][SQL] Fix WindowExec complex object aggregation with spilling" This reverts commit d995dac1cdeec940364453675f59ce5cf2b53684. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ecef249 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ecef249 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ecef249 Branch: refs/heads/branch-2.1 Commit: 3ecef249184cdcf74765070d3ea39cf180214976 Parents: d995dac Author: Wenchen FanAuthored: Fri Jun 30 14:45:55 2017 +0800 Committer: Wenchen Fan Committed: Fri Jun 30 14:45:55 2017 +0800 -- .../execution/window/AggregateProcessor.scala | 7 +-- .../sql/DataFrameWindowFunctionsSuite.scala | 47 +--- 2 files changed, 3 insertions(+), 51 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ecef249/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala index dfa1100..c9f5d3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala @@ -145,13 +145,10 @@ private[window] final class AggregateProcessor( /** Update the buffer. */ def update(input: InternalRow): Unit = { -// TODO(hvanhovell) this sacrifices performance for correctness. We should make sure that -// MutableProjection makes copies of the complex input objects it buffer. -val copy = input.copy() -updateProjection(join(buffer, copy)) +updateProjection(join(buffer, input)) var i = 0 while (i < numImperatives) { - imperatives(i).update(buffer, copy) + imperatives(i).update(buffer, input) i += 1 } } http://git-wip-us.apache.org/repos/asf/spark/blob/3ecef249/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 204858f..1255c49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window} import org.apache.spark.sql.functions._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{DataType, LongType, StructType} /** * Window function testing for DataFrame API. @@ -424,48 +423,4 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext { df.select(selectList: _*).where($"value" < 2), Seq(Row(3, "1", null, 3.0, 4.0, 3.0), Row(5, "1", false, 4.0, 5.0, 5.0))) } - - test("SPARK-21258: complex object in combination with spilling") { -// Make sure we trigger the spilling path. -withSQLConf(SQLConf.WINDOW_EXEC_BUFFER_SPILL_THRESHOLD.key -> "17") { - val sampleSchema = new StructType(). -add("f0", StringType). -add("f1", LongType). -add("f2", ArrayType(new StructType(). - add("f20", StringType))). -add("f3", ArrayType(new StructType(). - add("f30", StringType))) - - val w0 = Window.partitionBy("f0").orderBy("f1") - val w1 = w0.rowsBetween(Long.MinValue, Long.MaxValue) - - val c0 = first(struct($"f2", $"f3")).over(w0) as "c0" - val c1 = last(struct($"f2", $"f3")).over(w1) as "c1" - - val input = - """{"f1":1497820153720,"f2":[{"f20":"x","f21":0}],"f3":[{"f30":"x","f31":0}]} - |{"f1":1497802179638} - |{"f1":1497802189347} - |{"f1":1497802189593} - |{"f1":1497802189597} - |{"f1":1497802189599} - |{"f1":1497802192103} - |{"f1":1497802193414} - |{"f1":1497802193577} - |{"f1":1497802193709} - |{"f1":1497802202883} - |{"f1":1497802203006} - |{"f1":1497802203743} - |{"f1":1497802203834} - |{"f1":1497802203887} - |{"f1":1497802203893} - |{"f1":1497802203976} - |{"f1":1497820168098} -
spark git commit: [SPARK-20889][SPARKR] Grouped documentation for COLLECTION column methods
Repository: spark Updated Branches: refs/heads/master fddb63f46 -> 52981715b [SPARK-20889][SPARKR] Grouped documentation for COLLECTION column methods ## What changes were proposed in this pull request? Grouped documentation for column collection methods. Author: actuaryzhangAuthor: Wayne Zhang Closes #18458 from actuaryzhang/sparkRDocCollection. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52981715 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52981715 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52981715 Branch: refs/heads/master Commit: 52981715bb8d653a1141f55b36da804412eb783a Parents: fddb63f Author: actuaryzhang Authored: Thu Jun 29 23:00:50 2017 -0700 Committer: Felix Cheung Committed: Thu Jun 29 23:00:50 2017 -0700 -- R/pkg/R/functions.R | 204 +-- R/pkg/R/generics.R | 27 --- 2 files changed, 108 insertions(+), 123 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52981715/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 67cb7a7..a1f5c4f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -171,6 +171,35 @@ NULL #' } NULL +#' Collection functions for Column operations +#' +#' Collection functions defined for \code{Column}. +#' +#' @param x Column to compute on. Note the difference in the following methods: +#' \itemize{ +#' \item \code{to_json}: it is the column containing the struct or array of the structs. +#' \item \code{from_json}: it is the column containing the JSON string. +#' } +#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains +#'additional named properties to control how it is converted, accepts the same +#'options as the JSON data source. +#' @name column_collection_functions +#' @rdname column_collection_functions +#' @family collection functions +#' @examples +#' \dontrun{ +#' # Dataframe used throughout this doc +#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) +#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) +#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp)) +#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1))) +#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1)) +#' head(tmp2) +#' head(select(tmp, posexplode(tmp$v1))) +#' head(select(tmp, sort_array(tmp$v1))) +#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))} +NULL + #' @details #' \code{lit}: A new Column is created to represent the literal value. #' If the parameter is a Column, it is returned unchanged. @@ -1642,30 +1671,23 @@ setMethod("to_date", column(jc) }) -#' to_json -#' -#' Converts a column containing a \code{structType} or array of \code{structType} into a Column -#' of JSON string. Resolving the Column can fail if an unsupported type is encountered. -#' -#' @param x Column containing the struct or array of the structs -#' @param ... additional named properties to control how it is converted, accepts the same options -#'as the JSON data source. +#' @details +#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType} +#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered. #' -#' @family non-aggregate functions -#' @rdname to_json -#' @name to_json -#' @aliases to_json,Column-method +#' @rdname column_collection_functions +#' @aliases to_json to_json,Column-method #' @export #' @examples +#' #' \dontrun{ #' # Converts a struct into a JSON object -#' df <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d") -#' select(df, to_json(df$d, dateFormat = 'dd/MM/')) +#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d") +#' select(df2, to_json(df2$d, dateFormat = 'dd/MM/')) #' #' # Converts an array of structs into a JSON array -#' df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") -#' select(df, to_json(df$people)) -#'} +#' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people))} #' @note to_json since 2.2.0 setMethod("to_json", signature(x = "Column"), function(x, ...) { @@ -2120,28 +2142,28 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) -#' from_json -#' -#' Parses a column containing a JSON string into a Column of \code{structType} with