spark git commit: [SPARK-10684] [SQL] StructType.interpretedOrdering need not to be serialized
Repository: spark Updated Branches: refs/heads/master 74d8f7dda -> e3b5d6cb2 [SPARK-10684] [SQL] StructType.interpretedOrdering need not to be serialized Kryo fails with buffer overflow even with max value (2G). {noformat} org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 1 Serialization trace: containsChild (org.apache.spark.sql.catalyst.expressions.BoundReference) child (org.apache.spark.sql.catalyst.expressions.SortOrder) array (scala.collection.mutable.ArraySeq) ordering (org.apache.spark.sql.catalyst.expressions.InterpretedOrdering) interpretedOrdering (org.apache.spark.sql.types.StructType) schema (org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema). To avoid this, increase spark.kryoserializer.buffer.max value. at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:263) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} Author: navis.ryuCloses #8808 from navis/SPARK-10684. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3b5d6cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3b5d6cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3b5d6cb Branch: refs/heads/master Commit: e3b5d6cb29e0f983fcc55920619e6433298955f5 Parents: 74d8f7d Author: navis.ryu Authored: Fri Sep 18 00:43:02 2015 -0700 Committer: Reynold Xin Committed: Fri Sep 18 00:43:02 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e3b5d6cb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index d8968ef..b29cf22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -305,7 +305,9 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru f(this) || fields.exists(field => field.dataType.existsRecursively(f)) } - private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType)) + @transient + private[sql] lazy val interpretedOrdering = +InterpretedOrdering.forSchema(this.fields.map(_.dataType)) } object StructType extends AbstractDataType { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9808] Remove hash shuffle file consolidation.
Repository: spark Updated Branches: refs/heads/master 3a22b1004 -> 348d7c9a9 [SPARK-9808] Remove hash shuffle file consolidation. Author: Reynold XinCloses #8812 from rxin/SPARK-9808-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/348d7c9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/348d7c9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/348d7c9a Branch: refs/heads/master Commit: 348d7c9a93dd00d3d1859342a8eb0aea2e77f597 Parents: 3a22b10 Author: Reynold Xin Authored: Fri Sep 18 13:48:41 2015 -0700 Committer: Josh Rosen Committed: Fri Sep 18 13:48:41 2015 -0700 -- .../shuffle/FileShuffleBlockResolver.scala | 178 ++- .../org/apache/spark/storage/BlockManager.scala | 9 - .../org/apache/spark/storage/DiskStore.scala| 3 - .../shuffle/hash/HashShuffleManagerSuite.scala | 110 docs/configuration.md | 10 -- .../shuffle/ExternalShuffleBlockResolver.java | 4 - project/MimaExcludes.scala | 4 + 7 files changed, 17 insertions(+), 301 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/348d7c9a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index c057de9..d9902f9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -17,9 +17,7 @@ package org.apache.spark.shuffle -import java.io.File import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -28,10 +26,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} -import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -43,24 +39,7 @@ private[spark] trait ShuffleWriterGroup { /** * Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file - * per reducer (this set of files is called a ShuffleFileGroup). - * - * As an optimization to reduce the number of physical shuffle files produced, multiple shuffle - * blocks are aggregated into the same file. There is one "combined shuffle file" per reducer - * per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle - * files, it releases them for another task. - * Regarding the implementation of this feature, shuffle files are identified by a 3-tuple: - * - shuffleId: The unique id given to the entire shuffle stage. - * - bucketId: The id of the output partition (i.e., reducer id) - * - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a - * time owns a particular fileId, and this id is returned to a pool when the task finishes. - * Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length) - * that specifies where in a given file the actual block data is located. - * - * Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping - * ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for - * each block stored in each file. In order to find the location of a shuffle block, we search the - * files within a ShuffleFileGroups associated with the block's reducer. + * per reducer. */ // Note: Changes to the format in this file should be kept in sync with // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData(). @@ -71,26 +50,15 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) private lazy val blockManager = SparkEnv.get.blockManager - // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. - // TODO: Remove this once the shuffle file consolidation feature is stable. - private val consolidateShuffleFiles = -conf.getBoolean("spark.shuffle.consolidateFiles", false) -
spark git commit: [SPARK-10539] [SQL] Project should not be pushed down through Intersect or Except #8742
Repository: spark Updated Branches: refs/heads/master 00a2911c5 -> c6f8135ee [SPARK-10539] [SQL] Project should not be pushed down through Intersect or Except #8742 Intersect and Except are both set operators and they use the all the columns to compare equality between rows. When pushing their Project parent down, the relations they based on would change, therefore not an equivalent transformation. JIRA: https://issues.apache.org/jira/browse/SPARK-10539 I added some comments based on the fix of https://github.com/apache/spark/pull/8742. Author: Yijie ShenAuthor: Yin Huai Closes #8823 from yhuai/fix_set_optimization. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6f8135e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6f8135e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6f8135e Branch: refs/heads/master Commit: c6f8135ee52202bd86adb090ab631e80330ea4df Parents: 00a2911 Author: Yijie Shen Authored: Fri Sep 18 13:20:13 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 13:20:13 2015 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 37 ++-- .../optimizer/SetOperationPushDownSuite.scala | 23 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 9 + 3 files changed, 39 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6f8135e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 648a65e..324f40a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -85,7 +85,22 @@ object SamplePushDown extends Rule[LogicalPlan] { } /** - * Pushes operations to either side of a Union, Intersect or Except. + * Pushes certain operations to both sides of a Union, Intersect or Except operator. + * Operations that are safe to pushdown are listed as follows. + * Union: + * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is + * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, + * we will not be able to pushdown Projections. + * + * Intersect: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. + * + * Except: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. */ object SetOperationPushDown extends Rule[LogicalPlan] { @@ -122,40 +137,26 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) -// Push down projection into union +// Push down projection through UNION ALL case Project(projectList, u @ Union(left, right)) => val rewrites = buildRewrites(u) Union( Project(projectList, left), Project(projectList.map(pushToRight(_, rewrites)), right)) -// Push down filter into intersect +// Push down filter through INTERSECT case Filter(condition, i @ Intersect(left, right)) => val rewrites = buildRewrites(i) Intersect( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) -// Push down projection into intersect -case Project(projectList, i @ Intersect(left, right)) => - val rewrites = buildRewrites(i) - Intersect( -Project(projectList, left), -Project(projectList.map(pushToRight(_, rewrites)), right)) - -// Push down filter into except +// Push down filter through EXCEPT case Filter(condition, e @ Except(left, right)) => val rewrites = buildRewrites(e) Except( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - -// Push down projection into except -case Project(projectList, e @ Except(left, right)) => - val rewrites = buildRewrites(e) - Except( -Project(projectList, left), -Project(projectList.map(pushToRight(_, rewrites)), right)) } }
spark git commit: [SPARK-10540] Fixes flaky all-data-type test
Repository: spark Updated Branches: refs/heads/master 35e8ab939 -> 00a2911c5 [SPARK-10540] Fixes flaky all-data-type test This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable. Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1] [1]: https://issues.apache.org/jira/browse/SPARK-10591 Author: Cheng LianCloses #8768 from liancheng/spark-10540/test-all-data-types. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00a2911c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00a2911c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00a2911c Branch: refs/heads/master Commit: 00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001 Parents: 35e8ab9 Author: Cheng Lian Authored: Fri Sep 18 12:19:08 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 12:19:08 2015 -0700 -- .../sql/sources/hadoopFsRelationSuites.scala| 109 --- 1 file changed, 43 insertions(+), 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00a2911c/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 8ffcef8..d750493 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } - ignore("test all data types") { -withTempPath { file => - // Create the schema. - val struct = -StructType( - StructField("f1", FloatType, true) :: -StructField("f2", ArrayType(BooleanType), true) :: Nil) - // TODO: add CalendarIntervalType to here once we can save it out. - val dataTypes = -Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => -StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) - - // Generate data at the driver side. We need to materialize the data first and then - // create RDD. - val maybeDataGenerator = -RandomDataGenerator.forType( - dataType = schema, + private val supportedDataTypes = Seq( +StringType, BinaryType, +NullType, BooleanType, +ByteType, ShortType, IntegerType, LongType, +FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), +DateType, TimestampType, +ArrayType(IntegerType), +MapType(StringType, LongType), +new StructType() + .add("f1", FloatType, nullable = true) + .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true), +new MyDenseVectorUDT() + ).filter(supportsDataType) + + for (dataType <- supportedDataTypes) { +test(s"test all data types - $dataType") { + withTempPath { file => +val path = file.getCanonicalPath + +val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, nullable = true, - seed = Some(System.nanoTime())) - val dataGenerator = -maybeDataGenerator - .getOrElse(fail(s"Failed to create data generator for schema $schema")) - val data = (1 to 10).map { i => -dataGenerator.apply() match { - case row: Row => row - case null => Row.fromSeq(Seq.fill(schema.length)(null)) - case other => -fail(s"Row or null is expected to be generated, " + - s"but a ${other.getClass.getCanonicalName} is generated.") + seed = Some(System.nanoTime()) +).getOrElse { + fail(s"Failed to create data generator for schema $dataType") } - } - // Create a DF for the schema with random data. - val rdd = sqlContext.sparkContext.parallelize(data, 10) - val df = sqlContext.createDataFrame(rdd, schema) +// Create a DF for the schema with random data. The index field is used to sort
spark git commit: [SPARK-10539] [SQL] Project should not be pushed down through Intersect or Except #8742
Repository: spark Updated Branches: refs/heads/branch-1.5 e1e781f04 -> 3df52ccfa [SPARK-10539] [SQL] Project should not be pushed down through Intersect or Except #8742 Intersect and Except are both set operators and they use the all the columns to compare equality between rows. When pushing their Project parent down, the relations they based on would change, therefore not an equivalent transformation. JIRA: https://issues.apache.org/jira/browse/SPARK-10539 I added some comments based on the fix of https://github.com/apache/spark/pull/8742. Author: Yijie ShenAuthor: Yin Huai Closes #8823 from yhuai/fix_set_optimization. (cherry picked from commit c6f8135ee52202bd86adb090ab631e80330ea4df) Signed-off-by: Yin Huai Conflicts: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3df52ccf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3df52ccf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3df52ccf Branch: refs/heads/branch-1.5 Commit: 3df52ccfa701f759bd60fe048d47d3664769b37f Parents: e1e781f Author: Yijie Shen Authored: Fri Sep 18 13:20:13 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 13:23:10 2015 -0700 -- .../sql/catalyst/optimizer/Optimizer.scala | 37 ++-- .../optimizer/SetOperationPushDownSuite.scala | 23 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 9 + 3 files changed, 39 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3df52ccf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a43..ce6abc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -85,7 +85,22 @@ object SamplePushDown extends Rule[LogicalPlan] { } /** - * Pushes operations to either side of a Union, Intersect or Except. + * Pushes certain operations to both sides of a Union, Intersect or Except operator. + * Operations that are safe to pushdown are listed as follows. + * Union: + * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is + * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, + * we will not be able to pushdown Projections. + * + * Intersect: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. + * + * Except: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. */ object SetOperationPushDown extends Rule[LogicalPlan] { @@ -122,40 +137,26 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) -// Push down projection into union +// Push down projection through UNION ALL case Project(projectList, u @ Union(left, right)) => val rewrites = buildRewrites(u) Union( Project(projectList, left), Project(projectList.map(pushToRight(_, rewrites)), right)) -// Push down filter into intersect +// Push down filter through INTERSECT case Filter(condition, i @ Intersect(left, right)) => val rewrites = buildRewrites(i) Intersect( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) -// Push down projection into intersect -case Project(projectList, i @ Intersect(left, right)) => - val rewrites = buildRewrites(i) - Intersect( -Project(projectList, left), -Project(projectList.map(pushToRight(_, rewrites)), right)) - -// Push down filter into except +// Push down filter through EXCEPT case Filter(condition, e @ Except(left, right)) => val rewrites = buildRewrites(e) Except( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - -// Push down projection into except -case Project(projectList, e @ Except(left, right)) => - val rewrites = buildRewrites(e) - Except( -
spark git commit: [SPARK-10540] Fixes flaky all-data-type test
Repository: spark Updated Branches: refs/heads/branch-1.5 2c6a51e14 -> e1e781f04 [SPARK-10540] Fixes flaky all-data-type test This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable. Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1] [1]: https://issues.apache.org/jira/browse/SPARK-10591 Author: Cheng LianCloses #8768 from liancheng/spark-10540/test-all-data-types. (cherry picked from commit 00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1e781f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1e781f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1e781f0 Branch: refs/heads/branch-1.5 Commit: e1e781f04963a69f9b2d0be664ed2457016d94d2 Parents: 2c6a51e Author: Cheng Lian Authored: Fri Sep 18 12:19:08 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 12:19:23 2015 -0700 -- .../sql/sources/hadoopFsRelationSuites.scala| 109 --- 1 file changed, 43 insertions(+), 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e1e781f0/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 0f1cddd..14efe40 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -102,80 +102,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } - ignore("test all data types") { -withTempPath { file => - // Create the schema. - val struct = -StructType( - StructField("f1", FloatType, true) :: -StructField("f2", ArrayType(BooleanType), true) :: Nil) - // TODO: add CalendarIntervalType to here once we can save it out. - val dataTypes = -Seq( - StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct, - new MyDenseVectorUDT()) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => -StructField(s"col$index", dataType, nullable = true) - } - val schema = StructType(fields) - - // Generate data at the driver side. We need to materialize the data first and then - // create RDD. - val maybeDataGenerator = -RandomDataGenerator.forType( - dataType = schema, + private val supportedDataTypes = Seq( +StringType, BinaryType, +NullType, BooleanType, +ByteType, ShortType, IntegerType, LongType, +FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), +DateType, TimestampType, +ArrayType(IntegerType), +MapType(StringType, LongType), +new StructType() + .add("f1", FloatType, nullable = true) + .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true), +new MyDenseVectorUDT() + ).filter(supportsDataType) + + for (dataType <- supportedDataTypes) { +test(s"test all data types - $dataType") { + withTempPath { file => +val path = file.getCanonicalPath + +val dataGenerator = RandomDataGenerator.forType( + dataType = dataType, nullable = true, - seed = Some(System.nanoTime())) - val dataGenerator = -maybeDataGenerator - .getOrElse(fail(s"Failed to create data generator for schema $schema")) - val data = (1 to 10).map { i => -dataGenerator.apply() match { - case row: Row => row - case null => Row.fromSeq(Seq.fill(schema.length)(null)) - case other => -fail(s"Row or null is expected to be generated, " + - s"but a ${other.getClass.getCanonicalName} is generated.") + seed = Some(System.nanoTime()) +).getOrElse { + fail(s"Failed to create data generator for schema $dataType") } - } - // Create a DF for the schema with random data. - val rdd = sqlContext.sparkContext.parallelize(data, 10) - val df =
spark git commit: [SPARK-10449] [SQL] Don't merge decimal types with incompatable precision or scales
Repository: spark Updated Branches: refs/heads/master c6f8135ee -> 3a22b1004 [SPARK-10449] [SQL] Don't merge decimal types with incompatable precision or scales >From JIRA: Schema merging should only handle struct fields. But currently we >also reconcile decimal precision and scale information. Author: Holden KarauCloses #8634 from holdenk/SPARK-10449-dont-merge-different-precision. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a22b100 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a22b100 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a22b100 Branch: refs/heads/master Commit: 3a22b1004f527d54d399dd0225cd7f2f8ffad9c5 Parents: c6f8135 Author: Holden Karau Authored: Fri Sep 18 13:47:14 2015 -0700 Committer: Cheng Lian Committed: Fri Sep 18 13:47:14 2015 -0700 -- .../org/apache/spark/sql/types/StructType.scala| 17 + 1 file changed, 13 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a22b100/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index b29cf22..d6b4367 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -373,10 +373,19 @@ object StructType extends AbstractDataType { StructType(newFields) case (DecimalType.Fixed(leftPrecision, leftScale), - DecimalType.Fixed(rightPrecision, rightScale)) => -DecimalType( - max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale), - max(leftScale, rightScale)) +DecimalType.Fixed(rightPrecision, rightScale)) => +if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) { + DecimalType(leftPrecision, leftScale) +} else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale") +} else if (leftPrecision != rightPrecision) { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"precision $leftPrecision and $rightPrecision") +} else { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"scala $leftScale and $rightScale") +} case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_]) if leftUdt.userClass == rightUdt.userClass => leftUdt - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10449] [SQL] Don't merge decimal types with incompatable precision or scales
Repository: spark Updated Branches: refs/heads/branch-1.5 3df52ccfa -> 4051fffaa [SPARK-10449] [SQL] Don't merge decimal types with incompatable precision or scales >From JIRA: Schema merging should only handle struct fields. But currently we >also reconcile decimal precision and scale information. Author: Holden KarauCloses #8634 from holdenk/SPARK-10449-dont-merge-different-precision. (cherry picked from commit 3a22b1004f527d54d399dd0225cd7f2f8ffad9c5) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4051fffa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4051fffa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4051fffa Branch: refs/heads/branch-1.5 Commit: 4051fffaa2533dacda7ec91650cc0675ce8a65cc Parents: 3df52cc Author: Holden Karau Authored: Fri Sep 18 13:47:14 2015 -0700 Committer: Cheng Lian Committed: Fri Sep 18 13:47:38 2015 -0700 -- .../org/apache/spark/sql/types/StructType.scala| 17 + 1 file changed, 13 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4051fffa/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index b29cf22..d6b4367 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -373,10 +373,19 @@ object StructType extends AbstractDataType { StructType(newFields) case (DecimalType.Fixed(leftPrecision, leftScale), - DecimalType.Fixed(rightPrecision, rightScale)) => -DecimalType( - max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale), - max(leftScale, rightScale)) +DecimalType.Fixed(rightPrecision, rightScale)) => +if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) { + DecimalType(leftPrecision, leftScale) +} else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale") +} else if (leftPrecision != rightPrecision) { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"precision $leftPrecision and $rightPrecision") +} else { + throw new SparkException("Failed to merge Decimal Tpes with incompatible " + +s"scala $leftScale and $rightScale") +} case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_]) if leftUdt.userClass == rightUdt.userClass => leftUdt - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10623] [SQL] Fixes ORC predicate push-down
Repository: spark Updated Branches: refs/heads/master c8149ef2c -> 22be2ae14 [SPARK-10623] [SQL] Fixes ORC predicate push-down When pushing down a leaf predicate, ORC `SearchArgument` builder requires an extra "parent" predicate (any one among `AND`/`OR`/`NOT`) to wrap the leaf predicate. E.g., to push down `a < 1`, we must build `AND(a < 1)` instead. Fortunately, when actually constructing the `SearchArgument`, the builder will eliminate all those unnecessary wrappers. This PR is based on #8783 authored by zhzhan. I also took the chance to simply `OrcFilters` a little bit to improve readability. Author: Cheng LianCloses #8799 from liancheng/spark-10623/fix-orc-ppd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22be2ae1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22be2ae1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22be2ae1 Branch: refs/heads/master Commit: 22be2ae147a111e88896f6fb42ed46bbf108a99b Parents: c8149ef Author: Cheng Lian Authored: Fri Sep 18 18:42:20 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 18:42:20 2015 -0700 -- .../apache/spark/sql/hive/orc/OrcFilters.scala | 56 .../spark/sql/hive/orc/OrcQuerySuite.scala | 30 +++ 2 files changed, 52 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/22be2ae1/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index b3d9f7f..27193f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -31,11 +31,13 @@ import org.apache.spark.sql.sources._ * and cannot be used anymore. */ private[orc] object OrcFilters extends Logging { - def createFilter(expr: Array[Filter]): Option[SearchArgument] = { -expr.reduceOption(And).flatMap { conjunction => - val builder = SearchArgumentFactory.newBuilder() - buildSearchArgument(conjunction, builder).map(_.build()) -} + def createFilter(filters: Array[Filter]): Option[SearchArgument] = { +for { + // Combines all filters with `And`s to produce a single conjunction predicate + conjunction <- filters.reduceOption(And) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder()) +} yield builder.build() } private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = { @@ -102,46 +104,32 @@ private[orc] object OrcFilters extends Logging { negate <- buildSearchArgument(child, builder.startNot()) } yield negate.end() - case EqualTo(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.equals(attribute, _)) + case EqualTo(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().equals(attribute, value).end()) - case EqualNullSafe(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.nullSafeEquals(attribute, _)) + case EqualNullSafe(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().nullSafeEquals(attribute, value).end()) - case LessThan(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.lessThan(attribute, _)) + case LessThan(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().lessThan(attribute, value).end()) - case LessThanOrEqual(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.lessThanEquals(attribute, _)) + case LessThanOrEqual(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().lessThanEquals(attribute, value).end()) - case GreaterThan(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.startNot().lessThanEquals(attribute, _).end()) + case GreaterThan(attribute, value) if isSearchableLiteral(value) => +Some(builder.startNot().lessThanEquals(attribute, value).end()) - case GreaterThanOrEqual(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.startNot().lessThan(attribute, _).end()) + case GreaterThanOrEqual(attribute, value) if
spark git commit: [SPARK-10623] [SQL] Fixes ORC predicate push-down
Repository: spark Updated Branches: refs/heads/branch-1.5 a6c315358 -> b3f1e6533 [SPARK-10623] [SQL] Fixes ORC predicate push-down When pushing down a leaf predicate, ORC `SearchArgument` builder requires an extra "parent" predicate (any one among `AND`/`OR`/`NOT`) to wrap the leaf predicate. E.g., to push down `a < 1`, we must build `AND(a < 1)` instead. Fortunately, when actually constructing the `SearchArgument`, the builder will eliminate all those unnecessary wrappers. This PR is based on #8783 authored by zhzhan. I also took the chance to simply `OrcFilters` a little bit to improve readability. Author: Cheng LianCloses #8799 from liancheng/spark-10623/fix-orc-ppd. (cherry picked from commit 22be2ae147a111e88896f6fb42ed46bbf108a99b) Signed-off-by: Yin Huai Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3f1e653 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3f1e653 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3f1e653 Branch: refs/heads/branch-1.5 Commit: b3f1e653320e074fe78971a2a3b659c36da20b45 Parents: a6c3153 Author: Cheng Lian Authored: Fri Sep 18 18:42:20 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 18:46:53 2015 -0700 -- .../apache/spark/sql/hive/orc/OrcFilters.scala | 53 +--- .../spark/sql/hive/orc/OrcQuerySuite.scala | 30 +++ 2 files changed, 53 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3f1e653/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 86142e5..27193f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -31,11 +31,13 @@ import org.apache.spark.sql.sources._ * and cannot be used anymore. */ private[orc] object OrcFilters extends Logging { - def createFilter(expr: Array[Filter]): Option[SearchArgument] = { -expr.reduceOption(And).flatMap { conjunction => - val builder = SearchArgumentFactory.newBuilder() - buildSearchArgument(conjunction, builder).map(_.build()) -} + def createFilter(filters: Array[Filter]): Option[SearchArgument] = { +for { + // Combines all filters with `And`s to produce a single conjunction predicate + conjunction <- filters.reduceOption(And) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder()) +} yield builder.build() } private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = { @@ -102,41 +104,32 @@ private[orc] object OrcFilters extends Logging { negate <- buildSearchArgument(child, builder.startNot()) } yield negate.end() - case EqualTo(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.equals(attribute, _)) + case EqualTo(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().equals(attribute, value).end()) + + case EqualNullSafe(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().nullSafeEquals(attribute, value).end()) - case LessThan(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.lessThan(attribute, _)) + case LessThan(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().lessThan(attribute, value).end()) - case LessThanOrEqual(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.lessThanEquals(attribute, _)) + case LessThanOrEqual(attribute, value) if isSearchableLiteral(value) => +Some(builder.startAnd().lessThanEquals(attribute, value).end()) - case GreaterThan(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.startNot().lessThanEquals(attribute, _).end()) + case GreaterThan(attribute, value) if isSearchableLiteral(value) => +Some(builder.startNot().lessThanEquals(attribute, value).end()) - case GreaterThanOrEqual(attribute, value) => -Option(value) - .filter(isSearchableLiteral) - .map(builder.startNot().lessThan(attribute, _).end()) + case
spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD
Repository: spark Updated Branches: refs/heads/master 348d7c9a9 -> 8074208fa [SPARK-10611] Clone Configuration for each task for NewHadoopRDD This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu KimCloses #8763 from mingyukim/mkim/SPARK-10611. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8074208f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8074208f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8074208f Branch: refs/heads/master Commit: 8074208fa47fa654c1055c48cfa0d923edeeb04f Parents: 348d7c9 Author: Mingyu Kim Authored: Fri Sep 18 15:40:58 2015 -0700 Committer: Josh Rosen Committed: Fri Sep 18 15:40:58 2015 -0700 -- .../org/apache/spark/rdd/BinaryFileRDD.scala| 5 +-- .../org/apache/spark/rdd/NewHadoopRDD.scala | 37 2 files changed, 34 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 6fec00d..aedced7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance +val conf = getConf inputFormat match { case configurable: Configurable => -configurable.setConf(getConf) +configurable.setConf(conf) case _ => } -val jobContext = newJobContext(getConf, jobId) +val jobContext = newJobContext(conf, jobId) inputFormat.setMinPartitions(jobContext, minPartitions) val rawSplits = inputFormat.getSplits(jobContext).toArray val result = new Array[Partition](rawSplits.size) http://git-wip-us.apache.org/repos/asf/spark/blob/8074208f/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 174979a..2872b93 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition( extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = 41 * (41 + rddId) + index } @@ -84,6 +83,27 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) + private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + + def getConf: Configuration = { +val conf: Configuration = confBroadcast.value.value +if (shouldCloneJobConf) { + // Hadoop Configuration objects are not thread-safe, which may lead to various problems if + // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This + // problem occurs somewhat rarely because most jobs treat the configuration as though it's + // immutable. One solution, implemented here, is to clone the Configuration object. + // Unfortunately, this clone can be very expensive. To avoid unexpected performance + // regressions for workloads and Hadoop versions that do not suffer from these thread-safety + // issues, this cloning is disabled by default. + NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { +logDebug("Cloning Hadoop Configuration") +new Configuration(conf) + } +} else { + conf +} + } + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance inputFormat match { @@ -104,7 +124,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = getConf val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) @@ -230,12 +250,16 @@ class NewHadoopRDD[K, V]( super.persist(storageLevel) } - - def getConf: Configuration = confBroadcast.value.value } private[spark] object NewHadoopRDD { /** + * Configuration's constructor
spark git commit: [SPARK-10611] Clone Configuration for each task for NewHadoopRDD
Repository: spark Updated Branches: refs/heads/branch-1.5 4051fffaa -> a6c315358 [SPARK-10611] Clone Configuration for each task for NewHadoopRDD This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu KimCloses #8763 from mingyukim/mkim/SPARK-10611. (cherry picked from commit 8074208fa47fa654c1055c48cfa0d923edeeb04f) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6c31535 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6c31535 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6c31535 Branch: refs/heads/branch-1.5 Commit: a6c315358b4517c461beabd5cd319d56d9fddd57 Parents: 4051fff Author: Mingyu Kim Authored: Fri Sep 18 15:40:58 2015 -0700 Committer: Josh Rosen Committed: Fri Sep 18 16:24:40 2015 -0700 -- .../org/apache/spark/rdd/BinaryFileRDD.scala| 1 + .../org/apache/spark/rdd/NewHadoopRDD.scala | 33 +--- 2 files changed, 30 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 1f755db..a9e78e8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -34,6 +34,7 @@ private[spark] class BinaryFileRDD[T]( override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance +val conf = getConf inputFormat match { case configurable: Configurable => configurable.setConf(conf) http://git-wip-us.apache.org/repos/asf/spark/blob/a6c31535/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6a9c004..796151e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition( extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = 41 * (41 + rddId) + index } @@ -84,6 +83,27 @@ class NewHadoopRDD[K, V]( @transient protected val jobId = new JobID(jobTrackerId, id) + private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + + def getConf: Configuration = { +val conf: Configuration = confBroadcast.value.value +if (shouldCloneJobConf) { + // Hadoop Configuration objects are not thread-safe, which may lead to various problems if + // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This + // problem occurs somewhat rarely because most jobs treat the configuration as though it's + // immutable. One solution, implemented here, is to clone the Configuration object. + // Unfortunately, this clone can be very expensive. To avoid unexpected performance + // regressions for workloads and Hadoop versions that do not suffer from these thread-safety + // issues, this cloning is disabled by default. + NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { +logDebug("Cloning Hadoop Configuration") +new Configuration(conf) + } +} else { + conf +} + } + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.newInstance inputFormat match { @@ -104,7 +124,7 @@ class NewHadoopRDD[K, V]( val iter = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) - val conf = confBroadcast.value.value + val conf = getConf val inputMetrics = context.taskMetrics .getInputMetricsForReadMethod(DataReadMethod.Hadoop) @@ -230,12 +250,16 @@ class NewHadoopRDD[K, V]( super.persist(storageLevel) } - - def getConf: Configuration = confBroadcast.value.value } private[spark] object NewHadoopRDD { /** + * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456). + * Therefore, we synchronize on this lock before calling new Configuration(). + */ + val CONFIGURATION_INSTANTIATION_LOCK = new Object() + + /** *
spark git commit: [MINOR] [ML] override toString of AttributeGroup
Repository: spark Updated Branches: refs/heads/master 8074208fa -> c8149ef2c [MINOR] [ML] override toString of AttributeGroup This makes equality test failures much more readable. mengxr Author: Eric LiangAuthor: Eric Liang Closes #8826 from ericl/attrgroupstr. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8149ef2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8149ef2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8149ef2 Branch: refs/heads/master Commit: c8149ef2c57f5c47ab97ee8d8d58a216d4bc4156 Parents: 8074208 Author: Eric Liang Authored: Fri Sep 18 16:23:05 2015 -0700 Committer: Xiangrui Meng Committed: Fri Sep 18 16:23:05 2015 -0700 -- .../main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8149ef2/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala index 457c158..2c29eeb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala @@ -183,6 +183,8 @@ class AttributeGroup private ( sum = 37 * sum + attributes.map(_.toSeq).hashCode sum } + + override def toString: String = toMetadata.toString } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10615] [PYSPARK] change assertEquals to assertEqual
Repository: spark Updated Branches: refs/heads/master 20fd35dfd -> 35e8ab939 [SPARK-10615] [PYSPARK] change assertEquals to assertEqual As ```assertEquals``` is deprecated, so we need to change ```assertEquals``` to ```assertEqual``` for existing python unit tests. Author: Yanbo LiangCloses #8814 from yanboliang/spark-10615. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35e8ab93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35e8ab93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35e8ab93 Branch: refs/heads/master Commit: 35e8ab939000d4a1a01c1af4015c25ff6f4013a3 Parents: 20fd35d Author: Yanbo Liang Authored: Fri Sep 18 09:53:52 2015 -0700 Committer: Xiangrui Meng Committed: Fri Sep 18 09:53:52 2015 -0700 -- python/pyspark/ml/tests.py| 16 ++-- python/pyspark/mllib/tests.py | 162 - python/pyspark/sql/tests.py | 18 ++-- python/pyspark/streaming/tests.py | 2 +- 4 files changed, 99 insertions(+), 99 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35e8ab93/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index b892318..648fa88 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -182,7 +182,7 @@ class ParamTests(PySparkTestCase): self.assertEqual(testParams.getMaxIter(), 10) testParams.setMaxIter(100) self.assertTrue(testParams.isSet(maxIter)) -self.assertEquals(testParams.getMaxIter(), 100) +self.assertEqual(testParams.getMaxIter(), 100) self.assertTrue(testParams.hasParam(inputCol)) self.assertFalse(testParams.hasDefault(inputCol)) @@ -195,7 +195,7 @@ class ParamTests(PySparkTestCase): testParams._setDefault(seed=41) testParams.setSeed(43) -self.assertEquals( +self.assertEqual( testParams.explainParams(), "\n".join(["inputCol: input column name (undefined)", "maxIter: max number of iterations (>= 0) (default: 10, current: 100)", @@ -264,23 +264,23 @@ class FeatureTests(PySparkTestCase): self.assertEqual(ngram0.getInputCol(), "input") self.assertEqual(ngram0.getOutputCol(), "output") transformedDF = ngram0.transform(dataset) -self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"]) +self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"]) def test_stopwordsremover(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default -self.assertEquals(stopWordRemover.getInputCol(), "input") +self.assertEqual(stopWordRemover.getInputCol(), "input") transformedDF = stopWordRemover.transform(dataset) -self.assertEquals(transformedDF.head().output, ["panda"]) +self.assertEqual(transformedDF.head().output, ["panda"]) # Custom stopwords = ["panda"] stopWordRemover.setStopWords(stopwords) -self.assertEquals(stopWordRemover.getInputCol(), "input") -self.assertEquals(stopWordRemover.getStopWords(), stopwords) +self.assertEqual(stopWordRemover.getInputCol(), "input") +self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) -self.assertEquals(transformedDF.head().output, ["a"]) +self.assertEqual(transformedDF.head().output, ["a"]) class HasInducedError(Params): http://git-wip-us.apache.org/repos/asf/spark/blob/35e8ab93/python/pyspark/mllib/tests.py -- diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 636f9a0..96cf134 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -166,13 +166,13 @@ class VectorTests(MLlibTestCase): [1., 2., 3., 4.], [1., 2., 3., 4.]]) arr = pyarray.array('d', [0, 1, 2, 3]) -self.assertEquals(10.0, sv.dot(dv)) +self.assertEqual(10.0, sv.dot(dv)) self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat))) -self.assertEquals(30.0, dv.dot(dv)) +self.assertEqual(30.0, dv.dot(dv)) self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat))) -self.assertEquals(30.0, lst.dot(dv)) +self.assertEqual(30.0, lst.dot(dv)) self.assertTrue(array_equal(array([10., 20., 30.,
svn commit: r1703900 [4/8] - /spark/site/docs/1.5.0/api/R/
Added: spark/site/docs/1.5.0/api/R/greatest.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/greatest.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/greatest.html (added) +++ spark/site/docs/1.5.0/api/R/greatest.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,44 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: greatest + + + + +greatest {SparkR}R Documentation + +greatest + +Description + +Returns the greatest value of the list of column names, skipping null values. +This function takes at least 2 parameters. It will return null if all parameters are null. + + + +Usage + + +## S4 method for signature 'Column' +greatest(x, ...) + +greatest(x, ...) + + + +See Also + +Other normal_funcs: abs; +bitwiseNOT, bitwiseNOT; +expr, expr; +ifelse; isNaN, +isNaN; least, +least; lit, +lit; nanvl, +nanvl; negate, +negate; randn, +randn, randn; +rand, rand, +rand; when + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/groupBy.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/groupBy.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/groupBy.html (added) +++ spark/site/docs/1.5.0/api/R/groupBy.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,71 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: GroupBy + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +groupBy {SparkR}R Documentation + +GroupBy + +Description + +Groups the DataFrame using the specified columns, so we can run aggregation on them. + + + +Usage + + +## S4 method for signature 'DataFrame' +groupBy(x, ...) + +## S4 method for signature 'DataFrame' +group_by(x, ...) + +group_by(x, ...) + +groupBy(x, ...) + + + +Arguments + + +x + +a DataFrame + + + + +Value + +a GroupedData + + + +See Also + +GroupedData + + + +Examples + +## Not run: +##D # Compute the average for all numeric columns grouped by department. +##D avg(groupBy(df, department)) +##D +##D # Compute the max age and average salary, grouped by department and gender. +##D agg(groupBy(df, department, gender), salary=avg, age - max) +## End(Not run) + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/hashCode.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/hashCode.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/hashCode.html (added) +++ spark/site/docs/1.5.0/api/R/hashCode.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,73 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: Compute the hashCode of an object + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +hashCode {SparkR}R Documentation + +Compute the hashCode of an object + +Description + +Java-style function to compute the hashCode for the given object. Returns +an integer value. + + + +Usage + + +hashCode(key) + + + +Arguments + + +key + +the object to be hashed + + + + +Details + +This only works for integer, numeric and character types right now. + + + +Value + +the hash code as an integer + + + +Examples + +hashCode(1L) # 1 + + +## Error in eval(expr, envir, enclos): could not find function hashCode + + +hashCode(1.0) # 1072693248 + + +## Error in eval(expr, envir, enclos): could not find function hashCode + + +hashCode(1) # 49 + + +## Error in eval(expr, envir, enclos): could not find function hashCode + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/head.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/head.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/head.html (added) +++ spark/site/docs/1.5.0/api/R/head.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,64 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: Head + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +head {SparkR}R Documentation + +Head + +Description + +Return the first NUM rows of a DataFrame as a data.frame. If NUM
svn commit: r1703900 [1/8] - /spark/site/docs/1.5.0/api/R/
Author: shivaram Date: Fri Sep 18 16:25:35 2015 New Revision: 1703900 URL: http://svn.apache.org/viewvc?rev=1703900=rev Log: Add 1.5.0 R API docs back Added: spark/site/docs/1.5.0/api/R/ spark/site/docs/1.5.0/api/R/00Index.html spark/site/docs/1.5.0/api/R/00frame_toc.html spark/site/docs/1.5.0/api/R/DataFrame.html spark/site/docs/1.5.0/api/R/GroupedData.html spark/site/docs/1.5.0/api/R/PipelineModel-class.html spark/site/docs/1.5.0/api/R/R.css spark/site/docs/1.5.0/api/R/abs.html spark/site/docs/1.5.0/api/R/acos.html spark/site/docs/1.5.0/api/R/add_months.html spark/site/docs/1.5.0/api/R/agg.html spark/site/docs/1.5.0/api/R/alias.html spark/site/docs/1.5.0/api/R/approxCountDistinct.html spark/site/docs/1.5.0/api/R/arrange.html spark/site/docs/1.5.0/api/R/ascii.html spark/site/docs/1.5.0/api/R/asin.html spark/site/docs/1.5.0/api/R/atan.html spark/site/docs/1.5.0/api/R/atan2.html spark/site/docs/1.5.0/api/R/avg.html spark/site/docs/1.5.0/api/R/base64.html spark/site/docs/1.5.0/api/R/between.html spark/site/docs/1.5.0/api/R/bin.html spark/site/docs/1.5.0/api/R/bitwiseNOT.html spark/site/docs/1.5.0/api/R/cache-methods.html spark/site/docs/1.5.0/api/R/cache.html spark/site/docs/1.5.0/api/R/cacheTable.html spark/site/docs/1.5.0/api/R/cancelJobGroup.html spark/site/docs/1.5.0/api/R/cast.html spark/site/docs/1.5.0/api/R/cbrt.html spark/site/docs/1.5.0/api/R/ceil.html spark/site/docs/1.5.0/api/R/clearCache.html spark/site/docs/1.5.0/api/R/clearJobGroup.html spark/site/docs/1.5.0/api/R/collect-methods.html spark/site/docs/1.5.0/api/R/collect.html spark/site/docs/1.5.0/api/R/column.html spark/site/docs/1.5.0/api/R/columns.html spark/site/docs/1.5.0/api/R/concat.html spark/site/docs/1.5.0/api/R/concat_ws.html spark/site/docs/1.5.0/api/R/conv.html spark/site/docs/1.5.0/api/R/cos.html spark/site/docs/1.5.0/api/R/cosh.html spark/site/docs/1.5.0/api/R/count.html spark/site/docs/1.5.0/api/R/countDistinct.html spark/site/docs/1.5.0/api/R/crc32.html spark/site/docs/1.5.0/api/R/createDataFrame.html spark/site/docs/1.5.0/api/R/createExternalTable.html spark/site/docs/1.5.0/api/R/date_add.html spark/site/docs/1.5.0/api/R/date_format.html spark/site/docs/1.5.0/api/R/date_sub.html spark/site/docs/1.5.0/api/R/datediff.html spark/site/docs/1.5.0/api/R/dayofmonth.html spark/site/docs/1.5.0/api/R/dayofyear.html spark/site/docs/1.5.0/api/R/describe.html spark/site/docs/1.5.0/api/R/dim.html spark/site/docs/1.5.0/api/R/distinct.html spark/site/docs/1.5.0/api/R/dropTempTable.html spark/site/docs/1.5.0/api/R/dtypes.html spark/site/docs/1.5.0/api/R/except.html spark/site/docs/1.5.0/api/R/exp.html spark/site/docs/1.5.0/api/R/explain.html spark/site/docs/1.5.0/api/R/explode.html spark/site/docs/1.5.0/api/R/expm1.html spark/site/docs/1.5.0/api/R/expr.html spark/site/docs/1.5.0/api/R/factorial.html spark/site/docs/1.5.0/api/R/filter.html spark/site/docs/1.5.0/api/R/first.html spark/site/docs/1.5.0/api/R/floor.html spark/site/docs/1.5.0/api/R/format_number.html spark/site/docs/1.5.0/api/R/format_string.html spark/site/docs/1.5.0/api/R/from_unixtime.html spark/site/docs/1.5.0/api/R/from_utc_timestamp.html spark/site/docs/1.5.0/api/R/glm-formula-ANY-DataFrame-method.html spark/site/docs/1.5.0/api/R/glm.html spark/site/docs/1.5.0/api/R/greatest.html spark/site/docs/1.5.0/api/R/groupBy.html spark/site/docs/1.5.0/api/R/hashCode.html spark/site/docs/1.5.0/api/R/head.html spark/site/docs/1.5.0/api/R/hex.html spark/site/docs/1.5.0/api/R/hour.html spark/site/docs/1.5.0/api/R/hypot.html spark/site/docs/1.5.0/api/R/ifelse.html spark/site/docs/1.5.0/api/R/index.html spark/site/docs/1.5.0/api/R/infer_type.html spark/site/docs/1.5.0/api/R/initcap.html spark/site/docs/1.5.0/api/R/insertInto.html spark/site/docs/1.5.0/api/R/instr.html spark/site/docs/1.5.0/api/R/intersect.html spark/site/docs/1.5.0/api/R/isLocal.html spark/site/docs/1.5.0/api/R/isNaN.html spark/site/docs/1.5.0/api/R/join.html spark/site/docs/1.5.0/api/R/jsonFile.html spark/site/docs/1.5.0/api/R/last.html spark/site/docs/1.5.0/api/R/last_day.html spark/site/docs/1.5.0/api/R/least.html spark/site/docs/1.5.0/api/R/length.html spark/site/docs/1.5.0/api/R/levenshtein.html spark/site/docs/1.5.0/api/R/limit.html spark/site/docs/1.5.0/api/R/lit.html spark/site/docs/1.5.0/api/R/locate.html spark/site/docs/1.5.0/api/R/log.html spark/site/docs/1.5.0/api/R/log10.html spark/site/docs/1.5.0/api/R/log1p.html spark/site/docs/1.5.0/api/R/log2.html spark/site/docs/1.5.0/api/R/lower.html spark/site/docs/1.5.0/api/R/lpad.html spark/site/docs/1.5.0/api/R/ltrim.html
svn commit: r1703900 [5/8] - /spark/site/docs/1.5.0/api/R/
Added: spark/site/docs/1.5.0/api/R/log2.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/log2.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/log2.html (added) +++ spark/site/docs/1.5.0/api/R/log2.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,67 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: log2 + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +log2 {SparkR}R Documentation + +log2 + +Description + +Computes the logarithm of the given column in base 2. + + + +Usage + + +## S4 method for signature 'Column' +log2(x) + + + +See Also + +Other math_funcs: acos; asin; +atan2; atan; +bin, bin; cbrt, +cbrt; ceil, +ceil, ceil; +conv, conv; +cosh; cos; +expm1; exp; +factorial; floor; +hex, hex; +hypot, hypot; +log10; log1p; +log; pmod, +pmod; rint, +rint; round; +shiftLeft, shiftLeft; +shiftRightUnsigned, +shiftRightUnsigned; +shiftRight, shiftRight; +signum, signum, +signum; sinh; +sin; sqrt; +tanh; tan; +toDegrees, toDegrees; +toRadians, toRadians; +unhex, unhex + + + +Examples + +## Not run: log2(df$c) + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/lower.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/lower.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/lower.html (added) +++ spark/site/docs/1.5.0/api/R/lower.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,69 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: lower + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +lower {SparkR}R Documentation + +lower + +Description + +Converts a string column to lower case. + + + +Usage + + +## S4 method for signature 'Column' +lower(x) + +lower(x) + + + +See Also + +Other string_funcs: ascii, +ascii; base64, +base64; concat_ws, +concat_ws; concat, +concat; format_number, +format_number; format_string, +format_string; initcap, +initcap; instr, +instr; length; +levenshtein, levenshtein; +locate, locate; +lpad, lpad; +ltrim, ltrim; +regexp_extract, +regexp_extract; +regexp_replace, +regexp_replace; reverse, +reverse; rpad, +rpad; rtrim, +rtrim; soundex, +soundex; substring_index, +substring_index; translate, +translate; trim, +trim; unbase64, +unbase64; upper, +upper + + + +Examples + +## Not run: lower(df$c) + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/lpad.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/lpad.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/lpad.html (added) +++ spark/site/docs/1.5.0/api/R/lpad.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,57 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: lpad + + + + +lpad {SparkR}R Documentation + +lpad + +Description + +Left-pad the string column with + + + +Usage + + +## S4 method for signature 'Column,numeric,character' +lpad(x, len, pad) + +lpad(x, len, pad) + + + +See Also + +Other string_funcs: ascii, +ascii; base64, +base64; concat_ws, +concat_ws; concat, +concat; format_number, +format_number; format_string, +format_string; initcap, +initcap; instr, +instr; length; +levenshtein, levenshtein; +locate, locate; +lower, lower; +ltrim, ltrim; +regexp_extract, +regexp_extract; +regexp_replace, +regexp_replace; reverse, +reverse; rpad, +rpad; rtrim, +rtrim; soundex, +soundex; substring_index, +substring_index; translate, +translate; trim, +trim; unbase64, +unbase64; upper, +upper + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/ltrim.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/ltrim.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/ltrim.html (added) +++ spark/site/docs/1.5.0/api/R/ltrim.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,69 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: ltrim + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +ltrim {SparkR}R Documentation + +ltrim + +Description +
svn commit: r1703900 [3/8] - /spark/site/docs/1.5.0/api/R/
Added: spark/site/docs/1.5.0/api/R/createExternalTable.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/createExternalTable.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/createExternalTable.html (added) +++ spark/site/docs/1.5.0/api/R/createExternalTable.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,76 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: Create an external table + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +createExternalTable {SparkR}R Documentation + +Create an external table + +Description + +Creates an external table based on the dataset in a data source, +Returns the DataFrame associated with the external table. + + + +Usage + + +createExternalTable(sqlContext, tableName, path = NULL, source = NULL, ...) + + + +Arguments + + +sqlContext + +SQLContext to use + +tableName + +A name of the table + +path + +The path of files to load + +source + +the name of external data source + + + + +Details + +The data source is specified by the 'source' and a set of options(...). +If 'source' is not specified, the default data source configured by +spark.sql.sources.default will be used. + + + +Value + +DataFrame + + + +Examples + +## Not run: +##D sc - sparkR.init() +##D sqlContext - sparkRSQL.init(sc) +##D df - sparkRSQL.createExternalTable(sqlContext, myjson, path=path/to/json, source=json) +## End(Not run) + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/date_add.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/date_add.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/date_add.html (added) +++ spark/site/docs/1.5.0/api/R/date_add.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,57 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: date_add + + + + +date_add {SparkR}R Documentation + +date_add + +Description + +Returns the date that is 'days' days after 'start' + + + +Usage + + +## S4 method for signature 'Column,numeric' +date_add(y, x) + +date_add(y, x) + + + +See Also + +Other datetime_funcs: add_months, +add_months; date_format, +date_format; date_sub, +date_sub; datediff, +datediff; dayofmonth, +dayofmonth; dayofyear, +dayofyear; from_unixtime, +from_unixtime; +from_utc_timestamp, +from_utc_timestamp; hour, +hour; last_day, +last_day; minute, +minute; months_between, +months_between; month, +month; next_day, +next_day; quarter, +quarter; second, +second; to_date, +to_date; to_utc_timestamp, +to_utc_timestamp; +unix_timestamp, +unix_timestamp, +unix_timestamp, +unix_timestamp; weekofyear, +weekofyear; year, +year + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/date_format.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/date_format.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/date_format.html (added) +++ spark/site/docs/1.5.0/api/R/date_format.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,69 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: date_format + + + + +date_format {SparkR}R Documentation + +date_format + +Description + +Converts a date/timestamp/string to a value of string in the format specified by the date +format given by the second argument. + + + +Usage + + +## S4 method for signature 'Column,character' +date_format(y, x) + +date_format(y, x) + + + +Details + +A pattern could be for instance +dd.MM. and could return a string like '18.03.1993'. All +pattern letters of java.text.SimpleDateFormat can be used. + +NOTE: Use when ever possible specialized functions like year. These benefit from a +specialized implementation. + + + +See Also + +Other datetime_funcs: add_months, +add_months; date_add, +date_add; date_sub, +date_sub; datediff, +datediff; dayofmonth, +dayofmonth; dayofyear, +dayofyear; from_unixtime, +from_unixtime; +from_utc_timestamp, +from_utc_timestamp; hour, +hour; last_day, +last_day; minute, +minute; months_between, +months_between; month, +month; next_day, +next_day; quarter, +quarter; second, +second; to_date, +to_date; to_utc_timestamp, +to_utc_timestamp; +unix_timestamp, +unix_timestamp, +unix_timestamp, +unix_timestamp; weekofyear, +weekofyear; year, +year + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/date_sub.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/date_sub.html?rev=1703900=auto == ---
svn commit: r1703900 [6/8] - /spark/site/docs/1.5.0/api/R/
Added: spark/site/docs/1.5.0/api/R/randn.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/randn.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/randn.html (added) +++ spark/site/docs/1.5.0/api/R/randn.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,59 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: randn + + + + +randn {SparkR}R Documentation + +randn + +Description + +Generate a column with i.i.d. samples from the standard normal distribution. + + + +Usage + + +## S4 method for signature 'missing' +randn(seed) + +## S4 method for signature 'numeric' +randn(seed) + +randn(seed) + + + +See Also + +Other normal_funcs: abs; +bitwiseNOT, bitwiseNOT; +expr, expr; +greatest, greatest; +ifelse; isNaN, +isNaN; least, +least; lit, +lit; nanvl, +nanvl; negate, +negate; rand, +rand, rand; +when + +Other normal_funcs: abs; +bitwiseNOT, bitwiseNOT; +expr, expr; +greatest, greatest; +ifelse; isNaN, +isNaN; least, +least; lit, +lit; nanvl, +nanvl; negate, +negate; rand, +rand, rand; +when + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/rbind.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/rbind.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/rbind.html (added) +++ spark/site/docs/1.5.0/api/R/rbind.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,26 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: Union two or more DataFrames + + + + +rbind {SparkR}R Documentation + +Union two or more DataFrames + +Description + +Returns a new DataFrame containing rows of all parameters. + + + +Usage + + +## S4 method for signature 'DataFrame' +rbind(x, ..., deparse.level = 1) + +rbind(..., deparse.level = 1) + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/read.df.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/read.df.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/read.df.html (added) +++ spark/site/docs/1.5.0/api/R/read.df.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,71 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: Load an DataFrame + + + +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css;> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"> +https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"> +hljs.initHighlightingOnLoad(); + + +read.df {SparkR}R Documentation + +Load an DataFrame + +Description + +Returns the dataset in a data source as a DataFrame + + + +Usage + + +read.df(sqlContext, path = NULL, source = NULL, schema = NULL, ...) + + + +Arguments + + +sqlContext + +SQLContext to use + +path + +The path of files to load + +source + +the name of external data source + + + + +Details + +The data source is specified by the 'source' and a set of options(...). +If 'source' is not specified, the default data source configured by +spark.sql.sources.default will be used. + + + +Value + +DataFrame + + + +Examples + +## Not run: +##D sc - sparkR.init() +##D sqlContext - sparkRSQL.init(sc) +##D df - read.df(sqlContext, path/to/file.json, source = json) +## End(Not run) + + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/regexp_extract.html URL: http://svn.apache.org/viewvc/spark/site/docs/1.5.0/api/R/regexp_extract.html?rev=1703900=auto == --- spark/site/docs/1.5.0/api/R/regexp_extract.html (added) +++ spark/site/docs/1.5.0/api/R/regexp_extract.html Fri Sep 18 16:25:35 2015 @@ -0,0 +1,56 @@ +http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd;>http://www.w3.org/1999/xhtml;>R: regexp_extract + + + + +regexp_extract {SparkR}R Documentation + +regexp_extract + +Description + +Extract a specific(idx) group identified by a java regex, from the specified string column. + + + +Usage + + +## S4 method for signature 'Column,character,numeric' +regexp_extract(x, pattern, idx) + +regexp_extract(x, pattern, idx) + + + +See Also + +Other string_funcs: ascii, +ascii; base64, +base64; concat_ws, +concat_ws; concat, +concat; format_number, +format_number; format_string, +format_string; initcap, +initcap; instr, +instr; length; +levenshtein, levenshtein; +locate, locate; +lower, lower; +lpad, lpad; +ltrim, ltrim; +regexp_replace, +regexp_replace; reverse, +reverse; rpad, +rpad; rtrim, +rtrim; soundex, +soundex; substring_index, +substring_index; translate, +translate; trim, +trim; unbase64, +unbase64; upper, +upper + + +[Package SparkR version 1.5.0 Index] + Added: spark/site/docs/1.5.0/api/R/regexp_replace.html URL:
spark git commit: [SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan
Repository: spark Updated Branches: refs/heads/master e3b5d6cb2 -> 20fd35dfd [SPARK-10451] [SQL] Prevent unnecessary serializations in InMemoryColumnarTableScan Many of the fields in InMemoryColumnar scan and InMemoryRelation can be made transient. This reduces my 1000ms job to abt 700 ms . The task size reduces from 2.8 mb to ~1300kb Author: Yash DattaCloses #8604 from saucam/serde. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20fd35df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20fd35df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20fd35df Branch: refs/heads/master Commit: 20fd35dfd1ac402b622604e7bbedcc53a580b0a2 Parents: e3b5d6c Author: Yash Datta Authored: Fri Sep 18 08:22:38 2015 -0700 Committer: Yin Huai Committed: Fri Sep 18 08:22:38 2015 -0700 -- .../columnar/InMemoryColumnarTableScan.scala| 35 1 file changed, 21 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/20fd35df/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 66d429b..d7e145f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -48,10 +48,10 @@ private[sql] case class InMemoryRelation( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, -child: SparkPlan, +@transient child: SparkPlan, tableName: Option[String])( -private var _cachedColumnBuffers: RDD[CachedBatch] = null, -private var _statistics: Statistics = null, +@transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, +@transient private var _statistics: Statistics = null, private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) extends LogicalPlan with MultiInstanceRelation { @@ -62,7 +62,7 @@ private[sql] case class InMemoryRelation( _batchStats } - val partitionStatistics = new PartitionStatistics(output) + @transient val partitionStatistics = new PartitionStatistics(output) private def computeSizeInBytes = { val sizeOfRow: Expression = @@ -196,7 +196,7 @@ private[sql] case class InMemoryRelation( private[sql] case class InMemoryColumnarTableScan( attributes: Seq[Attribute], predicates: Seq[Expression], -relation: InMemoryRelation) +@transient relation: InMemoryRelation) extends LeafNode { override def output: Seq[Attribute] = attributes @@ -205,7 +205,7 @@ private[sql] case class InMemoryColumnarTableScan( // Returned filter predicate should return false iff it is impossible for the input expression // to evaluate to `true' based on statistics collected about this partition batch. - val buildFilter: PartialFunction[Expression, Expression] = { + @transient val buildFilter: PartialFunction[Expression, Expression] = { case And(lhs: Expression, rhs: Expression) if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) @@ -268,16 +268,23 @@ private[sql] case class InMemoryColumnarTableScan( readBatches.setValue(0) } -relation.cachedColumnBuffers.mapPartitions { cachedBatchIterator => - val partitionFilter = newPredicate( -partitionFilters.reduceOption(And).getOrElse(Literal(true)), -relation.partitionStatistics.schema) +// Using these variables here to avoid serialization of entire objects (if referenced directly) +// within the map Partitions closure. +val schema = relation.partitionStatistics.schema +val schemaIndex = schema.zipWithIndex +val relOutput = relation.output +val buffers = relation.cachedColumnBuffers + +buffers.mapPartitions { cachedBatchIterator => +val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) // Find the ordinals and data types of the requested columns. If none are requested, use the // narrowest (the field with minimum default element size). val (requestedColumnIndices, requestedColumnDataTypes) = if (attributes.isEmpty) { val (narrowestOrdinal, narrowestDataType) = - relation.output.zipWithIndex.map { case (a, ordinal) => + relOutput.zipWithIndex.map { case (a, ordinal) =>