spark git commit: [SPARK-16606][MINOR] Tiny follow-up to , to correct more instances of the same log message typo
Repository: spark Updated Branches: refs/heads/branch-2.0 d3c78c4f3 -> a0c03c925 [SPARK-16606][MINOR] Tiny follow-up to , to correct more instances of the same log message typo ## What changes were proposed in this pull request? Tiny follow-up to SPARK-16606 / https://github.com/apache/spark/pull/14533 , to correct more instances of the same log message typo ## How was this patch tested? Existing tests (no functional change anyway) Author: Sean OwenCloses #15586 from srowen/SPARK-16606.2. (cherry picked from commit 7178c56433cd138dae53db9194c55e3f4fa0fa69) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0c03c92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0c03c92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0c03c92 Branch: refs/heads/branch-2.0 Commit: a0c03c92545c147015308cce195dfc2e8a3074fb Parents: d3c78c4 Author: Sean Owen Authored: Fri Oct 21 22:20:52 2016 -0700 Committer: Reynold Xin Committed: Fri Oct 21 22:21:07 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0c03c92/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a7de115..13d3e75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -802,7 +802,7 @@ object SparkSession { if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } if (options.nonEmpty) { - logWarning("Use an existing SparkSession, some configuration may not take effect.") + logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } @@ -814,7 +814,7 @@ object SparkSession { if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } if (options.nonEmpty) { -logWarning("Use an existing SparkSession, some configuration may not take effect.") +logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16606][MINOR] Tiny follow-up to , to correct more instances of the same log message typo
Repository: spark Updated Branches: refs/heads/master 3fbf5a58c -> 7178c5643 [SPARK-16606][MINOR] Tiny follow-up to , to correct more instances of the same log message typo ## What changes were proposed in this pull request? Tiny follow-up to SPARK-16606 / https://github.com/apache/spark/pull/14533 , to correct more instances of the same log message typo ## How was this patch tested? Existing tests (no functional change anyway) Author: Sean OwenCloses #15586 from srowen/SPARK-16606.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7178c564 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7178c564 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7178c564 Branch: refs/heads/master Commit: 7178c56433cd138dae53db9194c55e3f4fa0fa69 Parents: 3fbf5a5 Author: Sean Owen Authored: Fri Oct 21 22:20:52 2016 -0700 Committer: Reynold Xin Committed: Fri Oct 21 22:20:52 2016 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7178c564/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index baae550..3045eb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -814,7 +814,7 @@ object SparkSession { if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { - logWarning("Use an existing SparkSession, some configuration may not take effect.") + logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } @@ -826,7 +826,7 @@ object SparkSession { if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } if (options.nonEmpty) { -logWarning("Use an existing SparkSession, some configuration may not take effect.") +logWarning("Using an existing SparkSession; some configuration may not take effect.") } return session } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches
Repository: spark Updated Branches: refs/heads/branch-2.0 3e9840f1d -> d3c78c4f3 [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches ## What changes were proposed in this pull request? Minor doc change to mention kafka configuration for larger spark batches. ## How was this patch tested? Doc change only, confirmed via jekyll. The configuration issue was discussed / confirmed with users on the mailing list. Author: cody koeningerCloses #15570 from koeninger/kafka-doc-heartbeat. (cherry picked from commit c9720b2195a465653690b3e221ce789142217b0d) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3c78c4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3c78c4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3c78c4f Branch: refs/heads/branch-2.0 Commit: d3c78c4f379a6ce3d055f935a6018d866606ebaf Parents: 3e9840f Author: cody koeninger Authored: Fri Oct 21 16:27:19 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 16:27:26 2016 -0700 -- docs/streaming-kafka-0-10-integration.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3c78c4f/docs/streaming-kafka-0-10-integration.md -- diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 456b845..de95ea9 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -48,6 +48,7 @@ Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javad For possible kafkaParams, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs). +If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below. ### LocationStrategies - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches
Repository: spark Updated Branches: refs/heads/master 268ccb9a4 -> c9720b219 [STREAMING][KAFKA][DOC] clarify kafka settings needed for larger batches ## What changes were proposed in this pull request? Minor doc change to mention kafka configuration for larger spark batches. ## How was this patch tested? Doc change only, confirmed via jekyll. The configuration issue was discussed / confirmed with users on the mailing list. Author: cody koeningerCloses #15570 from koeninger/kafka-doc-heartbeat. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9720b21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9720b21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9720b21 Branch: refs/heads/master Commit: c9720b2195a465653690b3e221ce789142217b0d Parents: 268ccb9 Author: cody koeninger Authored: Fri Oct 21 16:27:19 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 16:27:19 2016 -0700 -- docs/streaming-kafka-0-10-integration.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c9720b21/docs/streaming-kafka-0-10-integration.md -- diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 456b845..de95ea9 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -48,6 +48,7 @@ Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javad For possible kafkaParams, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs). +If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below. ### LocationStrategies - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream
Repository: spark Updated Branches: refs/heads/branch-2.0 b113b5d9f -> 3e9840f1d [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream ## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeningerCloses #15504 from koeninger/SPARK-17812. (cherry picked from commit 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e9840f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e9840f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e9840f1 Branch: refs/heads/branch-2.0 Commit: 3e9840f1d923a521d64bfc55fcbb6babd6045f06 Parents: b113b5d Author: cody koeninger Authored: Fri Oct 21 15:55:04 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 15:55:11 2016 -0700 -- docs/structured-streaming-kafka-integration.md | 38 +-- .../apache/spark/sql/kafka010/JsonUtils.scala | 93 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 64 +-- .../sql/kafka010/KafkaSourceProvider.scala | 52 - .../spark/sql/kafka010/StartingOffsets.scala| 32 ++ .../spark/sql/kafka010/JsonUtilsSuite.scala | 45 .../spark/sql/kafka010/KafkaSourceSuite.scala | 114 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 8 files changed, 395 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e9840f1/docs/structured-streaming-kafka-integration.md -- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 668489a..e851f21 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -151,15 +151,24 @@ The following options must be set for the Kafka source. Optionvaluemeaning + assign + json string {"topicA":[0,1],"topicB":[2,4]} + Specific TopicPartitions to consume. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. + + subscribe A comma-separated list of topics - The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be - specified for Kafka source. + The topic list to subscribe. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. subscribePattern Java regex string - The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern" + The pattern used to subscribe to topic(s). + Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. @@ -174,16 +183,21 @@ The following configurations are optional: Optionvaluedefaultmeaning - startingOffset - ["earliest", "latest"] - "latest" - The start point when a query is started, either "earliest" which is from the earliest offset, - or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q - uery is started, and that resuming will always pick up from where the query left off. + startingOffsets + earliest, latest, or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + + latest + The start point when a query is started, either "earliest" which is from the earliest offsets, + "latest" which is just from the latest offsets, or a json string specifying a starting offset for + each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. + Note: This only applies when a new Streaming query is started, and that resuming will always pick + up from where the query left off. Newly discovered partitions during a query will start at + earliest. failOnDataLoss - [true, false] + true or false true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka. Note that the following Kafka params cannot be set and the Kafka source will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. -- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify +-
spark git commit: [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream
Repository: spark Updated Branches: refs/heads/master 140570252 -> 268ccb9a4 [SPARK-17812][SQL][KAFKA] Assign and specific startingOffsets for structured stream ## What changes were proposed in this pull request? startingOffsets takes specific per-topicpartition offsets as a json argument, usable with any consumer strategy assign with specific topicpartitions as a consumer strategy ## How was this patch tested? Unit tests Author: cody koeningerCloses #15504 from koeninger/SPARK-17812. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/268ccb9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/268ccb9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/268ccb9a Branch: refs/heads/master Commit: 268ccb9a48dfefc4d7bc85155e7e20a2dfe89307 Parents: 1405702 Author: cody koeninger Authored: Fri Oct 21 15:55:04 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 15:55:04 2016 -0700 -- docs/structured-streaming-kafka-integration.md | 38 +-- .../apache/spark/sql/kafka010/JsonUtils.scala | 93 +++ .../apache/spark/sql/kafka010/KafkaSource.scala | 64 +-- .../sql/kafka010/KafkaSourceProvider.scala | 52 - .../spark/sql/kafka010/StartingOffsets.scala| 32 ++ .../spark/sql/kafka010/JsonUtilsSuite.scala | 45 .../spark/sql/kafka010/KafkaSourceSuite.scala | 114 +-- .../spark/sql/kafka010/KafkaTestUtils.scala | 14 ++- 8 files changed, 395 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/268ccb9a/docs/structured-streaming-kafka-integration.md -- diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 668489a..e851f21 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -151,15 +151,24 @@ The following options must be set for the Kafka source. Optionvaluemeaning + assign + json string {"topicA":[0,1],"topicB":[2,4]} + Specific TopicPartitions to consume. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. + + subscribe A comma-separated list of topics - The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be - specified for Kafka source. + The topic list to subscribe. + Only one of "assign", "subscribe" or "subscribePattern" + options can be specified for Kafka source. subscribePattern Java regex string - The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern" + The pattern used to subscribe to topic(s). + Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. @@ -174,16 +183,21 @@ The following configurations are optional: Optionvaluedefaultmeaning - startingOffset - ["earliest", "latest"] - "latest" - The start point when a query is started, either "earliest" which is from the earliest offset, - or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q - uery is started, and that resuming will always pick up from where the query left off. + startingOffsets + earliest, latest, or json string + {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + + latest + The start point when a query is started, either "earliest" which is from the earliest offsets, + "latest" which is just from the latest offsets, or a json string specifying a starting offset for + each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. + Note: This only applies when a new Streaming query is started, and that resuming will always pick + up from where the query left off. Newly discovered partitions during a query will start at + earliest. failOnDataLoss - [true, false] + true or false true Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka. Note that the following Kafka params cannot be set and the Kafka source will throw an exception: - **group.id**: Kafka source will create a unique group id for each query automatically. -- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify +- **auto.offset.reset**: Set the source option `startingOffsets` to specify where to start instead. Structured Streaming manages which offsets are
spark git commit: [SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every batch
Repository: spark Updated Branches: refs/heads/master c1f344f1a -> 140570252 [SPARK-18044][STREAMING] FileStreamSource should not infer partitions in every batch ## What changes were proposed in this pull request? In `FileStreamSource.getBatch`, we will create a `DataSource` with specified schema, to avoid inferring the schema again and again. However, we don't pass the partition columns, and will infer the partition again and again. This PR fixes it by keeping the partition columns in `FileStreamSource`, like schema. ## How was this patch tested? N/A Author: Wenchen FanCloses #15581 from cloud-fan/stream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14057025 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14057025 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14057025 Branch: refs/heads/master Commit: 140570252fd3739d6bdcadd6d4d5a180e480d3e0 Parents: c1f344f Author: Wenchen Fan Authored: Fri Oct 21 15:28:16 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 15:28:16 2016 -0700 -- .../sql/execution/datasources/DataSource.scala | 26 ++-- .../execution/streaming/FileStreamSource.scala | 2 ++ .../streaming/FileStreamSourceSuite.scala | 2 +- 3 files changed, 21 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14057025/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 92b1fff..17da606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -75,7 +75,7 @@ case class DataSource( bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty) extends Logging { - case class SourceInfo(name: String, schema: StructType) + case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) lazy val providingClass: Class[_] = lookupDataSource(className) lazy val sourceInfo = sourceSchema() @@ -186,8 +186,11 @@ case class DataSource( } } - private def inferFileFormatSchema(format: FileFormat): StructType = { -userSpecifiedSchema.orElse { + /** + * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + */ + private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { +userSpecifiedSchema.map(_ -> partitionColumns).orElse { val caseInsensitiveOptions = new CaseInsensitiveMap(options) val allPaths = caseInsensitiveOptions.get("path") val globbedPaths = allPaths.toSeq.flatMap { path => @@ -197,14 +200,14 @@ case class DataSource( SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) - val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields + val partitionSchema = fileCatalog.partitionSpec().partitionColumns val inferred = format.inferSchema( sparkSession, caseInsensitiveOptions, fileCatalog.allFiles()) inferred.map { inferredSchema => -StructType(inferredSchema ++ partitionCols) +StructType(inferredSchema ++ partitionSchema) -> partitionSchema.map(_.name) } }.getOrElse { throw new AnalysisException("Unable to infer schema. It must be specified manually.") @@ -217,7 +220,7 @@ case class DataSource( case s: StreamSourceProvider => val (name, schema) = s.sourceSchema( sparkSession.sqlContext, userSpecifiedSchema, className, options) -SourceInfo(name, schema) +SourceInfo(name, schema, Nil) case format: FileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) @@ -246,7 +249,8 @@ case class DataSource( "you may be able to create a static DataFrame on that directory with " + "'spark.read.load(directory)' and infer schema from it.") } -SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format)) +val (schema, partCols) = inferFileFormatSchema(format) +SourceInfo(s"FileSource[$path]", schema, partCols) case _ => throw new UnsupportedOperationException( @@ -266,7 +270,13 @@ case class DataSource( throw new IllegalArgumentException("'path' is not specified")
spark git commit: [SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset
Repository: spark Updated Branches: refs/heads/master 7a531e305 -> c1f344f1a [SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-17929 Now `CoarseGrainedSchedulerBackend` reset will get the lock, ``` protected def reset(): Unit = synchronized { numPendingExecutors = 0 executorsPendingToRemove.clear() // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } ``` but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock. ``` private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } ... ## How was this patch tested? manual test. Author: w00228970Closes #15481 from scwf/spark-17929. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1f344f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1f344f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1f344f1 Branch: refs/heads/master Commit: c1f344f1a09b8834bec70c1ece30b9bff63e55ea Parents: 7a531e3 Author: w00228970 Authored: Fri Oct 21 14:43:55 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 14:43:55 2016 -0700 -- .../cluster/CoarseGrainedSchedulerBackend.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1f344f1/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0dae0e6..10d55c8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. * */ - protected def reset(): Unit = synchronized { -numPendingExecutors = 0 -executorsPendingToRemove.clear() + protected def reset(): Unit = { +val executors = synchronized { + numPendingExecutors = 0 + executorsPendingToRemove.clear() + Set() ++ executorDataMap.keys +} // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. -executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean]( -RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) +executors.foreach { eid => + removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset
Repository: spark Updated Branches: refs/heads/branch-2.0 af2e6e0c9 -> b113b5d9f [SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-17929 Now `CoarseGrainedSchedulerBackend` reset will get the lock, ``` protected def reset(): Unit = synchronized { numPendingExecutors = 0 executorsPendingToRemove.clear() // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } ``` but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock. ``` private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } ... ## How was this patch tested? manual test. Author: w00228970Closes #15481 from scwf/spark-17929. (cherry picked from commit c1f344f1a09b8834bec70c1ece30b9bff63e55ea) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b113b5d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b113b5d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b113b5d9 Branch: refs/heads/branch-2.0 Commit: b113b5d9ff100385154ef0f836feb9805db163d2 Parents: af2e6e0 Author: w00228970 Authored: Fri Oct 21 14:43:55 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 21 14:44:05 2016 -0700 -- .../cluster/CoarseGrainedSchedulerBackend.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b113b5d9/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c6b3fdf..2c173db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. * */ - protected def reset(): Unit = synchronized { -numPendingExecutors = 0 -executorsPendingToRemove.clear() + protected def reset(): Unit = { +val executors = synchronized { + numPendingExecutors = 0 + executorsPendingToRemove.clear() + Set() ++ executorDataMap.keys +} // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. -executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean]( -RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) +executors.foreach { eid => + removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses
Repository: spark Updated Branches: refs/heads/branch-2.0 78458a7eb -> af2e6e0c9 [SPARK-17926][SQL][STREAMING] Added json for statuses ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata DasCloses #15476 from tdas/SPARK-17926. (cherry picked from commit 7a531e3054f8d4820216ed379433559f57f571b8) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af2e6e0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af2e6e0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af2e6e0c Branch: refs/heads/branch-2.0 Commit: af2e6e0c9c85c40bc505ed1183857a8fb60fbd72 Parents: 78458a7 Author: Tathagata Das Authored: Fri Oct 21 13:07:29 2016 -0700 Committer: Yin Huai Committed: Fri Oct 21 13:07:59 2016 -0700 -- python/pyspark/sql/streaming.py | 11 +- .../apache/spark/sql/streaming/SinkStatus.scala | 18 +++- .../spark/sql/streaming/SourceStatus.scala | 23 +++- .../sql/streaming/StreamingQueryStatus.scala| 55 +++--- .../streaming/StreamingQueryStatusSuite.scala | 105 +++ 5 files changed, 187 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 0df63a7..cfe917b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -205,8 +205,7 @@ class StreamingQueryStatus(object): Pretty string of this query status. >>> print(sqs) -StreamingQueryStatus: -Query name: query +Status of query 'query' Query id: 1 Status timestamp: 123 Input rate: 15.5 rows/sec @@ -220,7 +219,7 @@ class StreamingQueryStatus(object): numRows.input.total: 100 triggerId: 5 Source statuses [1 source]: -Source 1:MySource1 +Source 1 - MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -228,7 +227,7 @@ class StreamingQueryStatus(object): numRows.input.source: 100 latency.getOffset.source: 10 latency.getBatch.source: 20 -Sink status: MySink +Sink status - MySink Committed offsets: [#1, -] """ return self._jsqs.toString() @@ -366,7 +365,7 @@ class SourceStatus(object): Pretty string of this source status. >>> print(sqs.sourceStatuses[0]) -SourceStatus:MySource1 +Status of source MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -457,7 +456,7 @@ class SinkStatus(object): Pretty string of this source status. >>> print(sqs.sinkStatus) -SinkStatus:MySink +Status of sink MySink Committed offsets: [#1, -] """ return self._jss.toString() http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index c991166..ab19602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent @@ -34,8 +39,19 @@ class SinkStatus private( val description: String, val offsetDesc: String) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString:
spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses
Repository: spark Updated Branches: refs/heads/master e371040a0 -> 7a531e305 [SPARK-17926][SQL][STREAMING] Added json for statuses ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata DasCloses #15476 from tdas/SPARK-17926. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a531e30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a531e30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a531e30 Branch: refs/heads/master Commit: 7a531e3054f8d4820216ed379433559f57f571b8 Parents: e371040 Author: Tathagata Das Authored: Fri Oct 21 13:07:29 2016 -0700 Committer: Yin Huai Committed: Fri Oct 21 13:07:29 2016 -0700 -- python/pyspark/sql/streaming.py | 11 +- .../apache/spark/sql/streaming/SinkStatus.scala | 18 +++- .../spark/sql/streaming/SourceStatus.scala | 23 +++- .../sql/streaming/StreamingQueryStatus.scala| 55 +++--- .../streaming/StreamingQueryStatusSuite.scala | 105 +++ 5 files changed, 187 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ce47bd1..35fc469 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -205,8 +205,7 @@ class StreamingQueryStatus(object): Pretty string of this query status. >>> print(sqs) -StreamingQueryStatus: -Query name: query +Status of query 'query' Query id: 1 Status timestamp: 123 Input rate: 15.5 rows/sec @@ -220,7 +219,7 @@ class StreamingQueryStatus(object): numRows.input.total: 100 triggerId: 5 Source statuses [1 source]: -Source 1:MySource1 +Source 1 - MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -228,7 +227,7 @@ class StreamingQueryStatus(object): numRows.input.source: 100 latency.getOffset.source: 10 latency.getBatch.source: 20 -Sink status: MySink +Sink status - MySink Committed offsets: [#1, -] """ return self._jsqs.toString() @@ -366,7 +365,7 @@ class SourceStatus(object): Pretty string of this source status. >>> print(sqs.sourceStatuses[0]) -SourceStatus:MySource1 +Status of source MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -457,7 +456,7 @@ class SinkStatus(object): Pretty string of this source status. >>> print(sqs.sinkStatus) -SinkStatus:MySink +Status of sink MySink Committed offsets: [#1, -] """ return self._jss.toString() http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index c991166..ab19602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent @@ -34,8 +39,19 @@ class SinkStatus private( val description: String, val offsetDesc: String) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = -"SinkStatus:" + indent(prettyString) +"Status of sink " + indent(prettyString).trim + + private[sql] def
spark git commit: [SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns
Repository: spark Updated Branches: refs/heads/branch-2.0 a65d40ab6 -> 78458a7eb [SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns ## What changes were proposed in this pull request? NA date values are serialized as "NA" and NA time values are serialized as NaN from R. In the backend we did not have proper logic to deal with them. As a result we got an IllegalArgumentException for Date and wrong value for time. This PR adds support for deserializing NA as Date and Time. ## How was this patch tested? * [x] TODO Author: HosseinCloses #15421 from falaki/SPARK-17811. (cherry picked from commit e371040a0150e4ed748a7c25465965840b61ca63) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78458a7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78458a7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78458a7e Branch: refs/heads/branch-2.0 Commit: 78458a7ebeba6758890b01cc2b7417ab2fda221e Parents: a65d40a Author: Hossein Authored: Fri Oct 21 12:38:52 2016 -0700 Committer: Felix Cheung Committed: Fri Oct 21 12:45:35 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 13 .../scala/org/apache/spark/api/r/SerDe.scala| 31 2 files changed, 38 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78458a7e/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d33fcde..b7b9de7 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -373,6 +373,19 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17811: can create DataFrame containing NA as date and time", { + df <- data.frame( +id = 1:2, +time = c(as.POSIXlt("2016-01-10"), NA), +date = c(as.Date("2016-10-01"), NA)) + + DF <- collect(createDataFrame(df)) + expect_true(is.na(DF$date[2])) + expect_equal(DF$date[1], as.Date("2016-10-01")) + expect_true(is.na(DF$time[2])) + expect_equal(DF$time[1], as.POSIXlt("2016-01-10")) +}) + test_that("create DataFrame with complex types", { e <- new.env() assign("n", 3L, envir = e) http://git-wip-us.apache.org/repos/asf/spark/blob/78458a7e/core/src/main/scala/org/apache/spark/api/r/SerDe.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index e4932a4..550e075 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -125,15 +125,34 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { -Date.valueOf(readString(in)) +try { + val inStr = readString(in) + if (inStr == "NA") { +null + } else { +Date.valueOf(inStr) + } +} catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null +} } def readTime(in: DataInputStream): Timestamp = { -val seconds = in.readDouble() -val sec = Math.floor(seconds).toLong -val t = new Timestamp(sec * 1000L) -t.setNanos(((seconds - sec) * 1e9).toInt) -t +try { + val seconds = in.readDouble() + if (java.lang.Double.isNaN(seconds)) { +null + } else { +val sec = Math.floor(seconds).toLong +val t = new Timestamp(sec * 1000L) +t.setNanos(((seconds - sec) * 1e9).toInt) +t + } +} catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null +} } def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns
Repository: spark Updated Branches: refs/heads/master e21e1c946 -> e371040a0 [SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns ## What changes were proposed in this pull request? NA date values are serialized as "NA" and NA time values are serialized as NaN from R. In the backend we did not have proper logic to deal with them. As a result we got an IllegalArgumentException for Date and wrong value for time. This PR adds support for deserializing NA as Date and Time. ## How was this patch tested? * [x] TODO Author: HosseinCloses #15421 from falaki/SPARK-17811. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e371040a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e371040a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e371040a Branch: refs/heads/master Commit: e371040a0150e4ed748a7c25465965840b61ca63 Parents: e21e1c9 Author: Hossein Authored: Fri Oct 21 12:38:52 2016 -0700 Committer: Felix Cheung Committed: Fri Oct 21 12:38:52 2016 -0700 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 13 .../scala/org/apache/spark/api/r/SerDe.scala| 31 2 files changed, 38 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e371040a/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3a987cd..b4b43fd 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -390,6 +390,19 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17811: can create DataFrame containing NA as date and time", { + df <- data.frame( +id = 1:2, +time = c(as.POSIXlt("2016-01-10"), NA), +date = c(as.Date("2016-10-01"), NA)) + + DF <- collect(createDataFrame(df)) + expect_true(is.na(DF$date[2])) + expect_equal(DF$date[1], as.Date("2016-10-01")) + expect_true(is.na(DF$time[2])) + expect_equal(DF$time[1], as.POSIXlt("2016-01-10")) +}) + test_that("create DataFrame with complex types", { e <- new.env() assign("n", 3L, envir = e) http://git-wip-us.apache.org/repos/asf/spark/blob/e371040a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index e4932a4..550e075 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -125,15 +125,34 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { -Date.valueOf(readString(in)) +try { + val inStr = readString(in) + if (inStr == "NA") { +null + } else { +Date.valueOf(inStr) + } +} catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null +} } def readTime(in: DataInputStream): Timestamp = { -val seconds = in.readDouble() -val sec = Math.floor(seconds).toLong -val t = new Timestamp(sec * 1000L) -t.setNanos(((seconds - sec) * 1e9).toInt) -t +try { + val seconds = in.readDouble() + if (java.lang.Double.isNaN(seconds)) { +null + } else { +val sec = Math.floor(seconds).toLong +val t = new Timestamp(sec * 1000L) +t.setNanos(((seconds - sec) * 1e9).toInt) +t + } +} catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null +} } def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18013][SPARKR] add crossJoin API
Repository: spark Updated Branches: refs/heads/master 4efdc764e -> e21e1c946 [SPARK-18013][SPARKR] add crossJoin API ## What changes were proposed in this pull request? Add crossJoin and do not default to cross join if joinExpr is left out ## How was this patch tested? unit test Author: Felix CheungCloses #15559 from felixcheung/rcrossjoin. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e21e1c94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e21e1c94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e21e1c94 Branch: refs/heads/master Commit: e21e1c946c4b7448fb150cfa2d9419864ae6f9b5 Parents: 4efdc76 Author: Felix Cheung Authored: Fri Oct 21 12:35:37 2016 -0700 Committer: Felix Cheung Committed: Fri Oct 21 12:35:37 2016 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 59 -- R/pkg/R/generics.R| 4 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 - docs/sparkr.md| 4 ++ 5 files changed, 64 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 5960c62..8718185 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -71,6 +71,7 @@ exportMethods("arrange", "covar_samp", "covar_pop", "createOrReplaceTempView", + "crossJoin", "crosstab", "dapply", "dapplyCollect", http://git-wip-us.apache.org/repos/asf/spark/blob/e21e1c94/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 801d2ed..8910a4b 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2271,12 +2271,13 @@ setMethod("dropDuplicates", #' Join #' -#' Join two SparkDataFrames based on the given join expression. +#' Joins two SparkDataFrames based on the given join expression. #' #' @param x A SparkDataFrame #' @param y A SparkDataFrame #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a -#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join +#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is +#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead. #' @param joinType The type of join to perform. The following join types are available: #' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left', #' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". @@ -2285,23 +2286,24 @@ setMethod("dropDuplicates", #' @aliases join,SparkDataFrame,SparkDataFrame-method #' @rdname join #' @name join -#' @seealso \link{merge} +#' @seealso \link{merge} \link{crossJoin} #' @export #' @examples #'\dontrun{ #' sparkR.session() #' df1 <- read.json(path) #' df2 <- read.json(path2) -#' join(df1, df2) # Performs a Cartesian #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression #' join(df1, df2, df1$col1 == df2$col2, "right_outer") +#' join(df1, df2) # Attempts an inner join #' } #' @note join since 1.4.0 setMethod("join", signature(x = "SparkDataFrame", y = "SparkDataFrame"), function(x, y, joinExpr = NULL, joinType = NULL) { if (is.null(joinExpr)) { - sdf <- callJMethod(x@sdf, "crossJoin", y@sdf) + # this may not fail until the planner checks for Cartesian join later on. + sdf <- callJMethod(x@sdf, "join", y@sdf) } else { if (class(joinExpr) != "Column") stop("joinExpr must be a Column") if (is.null(joinType)) { @@ -2322,22 +2324,52 @@ setMethod("join", dataFrame(sdf) }) +#' CrossJoin +#' +#' Returns Cartesian Product on two SparkDataFrames. +#' +#' @param x A SparkDataFrame +#' @param y A SparkDataFrame +#' @return A SparkDataFrame containing the result of the join operation. +#' @family SparkDataFrame functions +#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method +#' @rdname crossJoin +#' @name crossJoin +#' @seealso \link{merge} \link{join} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' crossJoin(df1, df2) # Performs a Cartesian +#' } +#' @note crossJoin since 2.1.0 +setMethod("crossJoin", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), +
spark git commit: [SPARK-17674][SPARKR] check for warning in test output
Repository: spark Updated Branches: refs/heads/master b3b4b9542 -> 4efdc764e [SPARK-17674][SPARKR] check for warning in test output ## What changes were proposed in this pull request? testthat library we are using for testing R is redirecting warning (and disabling `options("warn" = 2)`), we need to have a way to detect any new warning and fail ## How was this patch tested? manual testing, Jenkins Author: Felix CheungCloses #15576 from felixcheung/rtestwarning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4efdc764 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4efdc764 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4efdc764 Branch: refs/heads/master Commit: 4efdc764edfbc4971f0e863947258482ca2017df Parents: b3b4b95 Author: Felix Cheung Authored: Fri Oct 21 12:34:14 2016 -0700 Committer: Felix Cheung Committed: Fri Oct 21 12:34:14 2016 -0700 -- R/run-tests.sh | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4efdc764/R/run-tests.sh -- diff --git a/R/run-tests.sh b/R/run-tests.sh index 1a1e8ab..5e4dafa 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -26,6 +26,8 @@ rm -f $LOGFILE SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE FAILED=$((PIPESTATUS[0]||$FAILED)) +NUM_TEST_WARNING="$(grep -c -e 'Warnings ' $LOGFILE)" + # Also run the documentation tests for CRAN CRAN_CHECK_LOG_FILE=$FWDIR/cran-check.out rm -f $CRAN_CHECK_LOG_FILE @@ -37,10 +39,10 @@ NUM_CRAN_WARNING="$(grep -c WARNING$ $CRAN_CHECK_LOG_FILE)" NUM_CRAN_ERROR="$(grep -c ERROR$ $CRAN_CHECK_LOG_FILE)" NUM_CRAN_NOTES="$(grep -c NOTE$ $CRAN_CHECK_LOG_FILE)" -if [[ $FAILED != 0 ]]; then +if [[ $FAILED != 0 || $NUM_TEST_WARNING != 0 ]]; then cat $LOGFILE echo -en "\033[31m" # Red -echo "Had test failures; see logs." +echo "Had test warnings or failures; see logs." echo -en "\033[0m" # No color exit -1 else - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness
Repository: spark Updated Branches: refs/heads/branch-2.0 6cc6cb2a9 -> a65d40ab6 [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness We should upgrade to the latest release of MiMa (0.1.11) in order to include a fix for a bug which led to flakiness in the MiMa checks (https://github.com/typesafehub/migration-manager/issues/115). Author: Josh RosenCloses #15571 from JoshRosen/SPARK-18034. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a65d40ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a65d40ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a65d40ab Branch: refs/heads/branch-2.0 Commit: a65d40ab63fecc993136a98b8a820d2a8893a9ba Parents: 6cc6cb2 Author: Josh Rosen Authored: Fri Oct 21 11:25:01 2016 -0700 Committer: Josh Rosen Committed: Fri Oct 21 11:28:49 2016 -0700 -- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a65d40ab/project/plugins.sbt -- diff --git a/project/plugins.sbt b/project/plugins.sbt index 8bebd7b..76597d2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,7 +6,7 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.9") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.11") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness
Repository: spark Updated Branches: refs/heads/master 3a237512b -> b3b4b9542 [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness We should upgrade to the latest release of MiMa (0.1.11) in order to include a fix for a bug which led to flakiness in the MiMa checks (https://github.com/typesafehub/migration-manager/issues/115). Author: Josh RosenCloses #15571 from JoshRosen/SPARK-18034. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3b4b954 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3b4b954 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3b4b954 Branch: refs/heads/master Commit: b3b4b9542223de3495a7a7e0dd27634ddb9f929d Parents: 3a23751 Author: Josh Rosen Authored: Fri Oct 21 11:25:01 2016 -0700 Committer: Josh Rosen Committed: Fri Oct 21 11:25:01 2016 -0700 -- project/MimaExcludes.scala | 7 ++- project/plugins.sbt| 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3b4b954/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index facf034..350b144 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -81,7 +81,12 @@ object MimaExcludes { // [SPARK-17338][SQL] add global temp view ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"), + + // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") ) } http://git-wip-us.apache.org/repos/asf/spark/blob/b3b4b954/project/plugins.sbt -- diff --git a/project/plugins.sbt b/project/plugins.sbt index 8bebd7b..76597d2 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,7 +6,7 @@ addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.9") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.11") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13275][WEB UI] Visually clarified executors start time in timeline
Repository: spark Updated Branches: refs/heads/master a8ea4da8d -> 3a237512b [SPARK-13275][WEB UI] Visually clarified executors start time in timeline ## What changes were proposed in this pull request? Updated the Executors added/removed bubble in the time line so it's clearer where it starts. Now the bubble is left justified on the start time (still also denoted by the line) rather than center justified. ## How was this patch tested? Manually tested UI https://cloud.githubusercontent.com/assets/13952758/19496563/e6c9186e-953c-11e6-85e4-63309a553f65.png;> https://cloud.githubusercontent.com/assets/13952758/19496568/e9f06132-953c-11e6-8901-54405ebc7f5b.png;> Author: Alex BozarthCloses #15536 from ajbozarth/spark13275. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a237512 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a237512 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a237512 Branch: refs/heads/master Commit: 3a237512b162d192b5503c08d121134a2dac6ff1 Parents: a8ea4da Author: Alex Bozarth Authored: Fri Oct 21 11:39:32 2016 +0100 Committer: Sean Owen Committed: Fri Oct 21 11:39:32 2016 +0100 -- .../src/main/resources/org/apache/spark/ui/static/timeline-view.js | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a237512/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index a6153ce..705a08f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -24,6 +24,7 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime, offset) { return a.value - b.value }, editable: false, +align: 'left', showCurrentTime: false, min: startTime, zoomable: false, @@ -99,6 +100,7 @@ function drawJobTimeline(groupArray, eventObjArray, startTime, offset) { return a.value - b.value; }, editable: false, +align: 'left', showCurrentTime: false, min: startTime, zoomable: false, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17960][PYSPARK][UPGRADE TO PY4J 0.10.4]
Repository: spark Updated Branches: refs/heads/master 57e97fcbd -> 595893d33 [SPARK-17960][PYSPARK][UPGRADE TO PY4J 0.10.4] ## What changes were proposed in this pull request? 1) Upgrade the Py4J version on the Java side 2) Update the py4j src zip file we bundle with Spark ## How was this patch tested? Existing doctests & unit tests pass Author: JagadeesanCloses #15514 from jagadeesanas2/SPARK-17960. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/595893d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/595893d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/595893d3 Branch: refs/heads/master Commit: 595893d33a26c838c8c5c0c599fbee7fa61cbdff Parents: 57e97fc Author: Jagadeesan Authored: Fri Oct 21 09:48:24 2016 +0100 Committer: Sean Owen Committed: Fri Oct 21 09:48:24 2016 +0100 -- LICENSE| 2 +- bin/pyspark| 2 +- bin/pyspark2.cmd | 2 +- core/pom.xml | 2 +- .../org/apache/spark/api/python/PythonUtils.scala | 2 +- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- python/docs/Makefile | 2 +- python/lib/py4j-0.10.3-src.zip | Bin 91275 -> 0 bytes python/lib/py4j-0.10.4-src.zip | Bin 0 -> 74096 bytes sbin/spark-config.sh | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 16 files changed, 14 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/595893d3/LICENSE -- diff --git a/LICENSE b/LICENSE index d68609c..7950dd6 100644 --- a/LICENSE +++ b/LICENSE @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf) (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net) (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net) - (The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/) + (The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/) (Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/) (BSD licence) sbt and sbt-launch-lib.bash (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE) http://git-wip-us.apache.org/repos/asf/spark/blob/595893d3/bin/pyspark -- diff --git a/bin/pyspark b/bin/pyspark index 7590309..d6b3ab0 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -57,7 +57,7 @@ export PYSPARK_PYTHON # Add the PySpark classes to the Python path: export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH" -export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:$PYTHONPATH" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH" # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP="$PYTHONSTARTUP" http://git-wip-us.apache.org/repos/asf/spark/blob/595893d3/bin/pyspark2.cmd -- diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 1217a4f..f211c08 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" ( ) set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH% -set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.3-src.zip;%PYTHONPATH% +set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.4-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py http://git-wip-us.apache.org/repos/asf/spark/blob/595893d3/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 205bbc5..eac99ab 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -331,7 +331,7 @@ net.sf.py4j py4j - 0.10.3 + 0.10.4 org.apache.spark
spark git commit: [SPARK-17331][FOLLOWUP][ML][CORE] Avoid allocating 0-length arrays
Repository: spark Updated Branches: refs/heads/master 595893d33 -> a8ea4da8d [SPARK-17331][FOLLOWUP][ML][CORE] Avoid allocating 0-length arrays ## What changes were proposed in this pull request? `Array[T]()` -> `Array.empty[T]` to avoid allocating 0-length arrays. Use regex `find . -name '*.scala' | xargs -i bash -c 'egrep "Array\[[A-Za-z]+\]\(\)" -n {} && echo {}'` to find modification candidates. cc srowen ## How was this patch tested? existing tests Author: Zheng RuiFengCloses #15564 from zhengruifeng/avoid_0_length_array. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8ea4da8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8ea4da8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8ea4da8 Branch: refs/heads/master Commit: a8ea4da8d04c1ed621a96668118f20739145edd2 Parents: 595893d Author: Zheng RuiFeng Authored: Fri Oct 21 09:49:37 2016 +0100 Committer: Sean Owen Committed: Fri Oct 21 09:49:37 2016 +0100 -- .../org/apache/spark/CheckpointSuite.scala | 2 +- .../apache/spark/deploy/JsonProtocolSuite.scala | 2 +- .../apache/spark/deploy/SparkSubmitSuite.scala | 2 +- .../history/HistoryServerArgumentsSuite.scala | 2 +- .../spark/io/ChunkedByteBufferSuite.scala | 4 ++-- .../spark/serializer/KryoSerializerSuite.scala | 2 +- .../apache/spark/ml/linalg/MatricesSuite.scala | 4 ++-- .../spark/ml/util/TestingUtilsSuite.scala | 24 ++-- .../ml/classification/LogisticRegression.scala | 2 +- .../mllib/stat/test/KolmogorovSmirnovTest.scala | 3 ++- .../MultilayerPerceptronClassifierSuite.scala | 2 +- .../apache/spark/ml/python/MLSerDeSuite.scala | 2 +- .../spark/ml/tree/impl/RandomForestSuite.scala | 4 ++-- .../mllib/api/python/PythonMLLibAPISuite.scala | 2 +- .../mllib/evaluation/RankingMetricsSuite.scala | 4 ++-- .../spark/mllib/linalg/MatricesSuite.scala | 4 ++-- .../spark/mllib/util/TestingUtilsSuite.scala| 24 ++-- .../expressions/StringExpressionsSuite.scala| 10 .../spark/sql/DataFrameFunctionsSuite.scala | 2 +- 19 files changed, 51 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8ea4da8/core/src/test/scala/org/apache/spark/CheckpointSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 9f94e36..b117c77 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -500,7 +500,7 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS } runTest("CheckpointRDD with zero partitions") { reliableCheckpoint: Boolean => -val rdd = new BlockRDD[Int](sc, Array[BlockId]()) +val rdd = new BlockRDD[Int](sc, Array.empty[BlockId]) assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) assert(rdd.isCheckpointedAndMaterialized === false) http://git-wip-us.apache.org/repos/asf/spark/blob/a8ea4da8/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 2d48e75..7093dad 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -65,7 +65,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { test("writeMasterState") { val workers = Array(createWorkerInfo(), createWorkerInfo()) val activeApps = Array(createAppInfo()) -val completedApps = Array[ApplicationInfo]() +val completedApps = Array.empty[ApplicationInfo] val activeDrivers = Array(createDriverInfo()) val completedDrivers = Array(createDriverInfo()) val stateResponse = new MasterStateResponse( http://git-wip-us.apache.org/repos/asf/spark/blob/a8ea4da8/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 732cbfa..7c649e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -91,7 +91,7 @@ class SparkSubmitSuite // scalastyle:off println test("prints usage on empty