spark git commit: [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0)

2016-09-23 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 5bc5b49fa -> 9e91a1009


[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size 
configurable (branch 2.0)

## What changes were proposed in this pull request?

Backport #14269 to 2.0.

## How was this patch tested?

Jenkins.

Author: Dhruve Ashar 

Closes #15222 from zsxwing/SPARK-15703-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e91a100
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e91a100
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e91a100

Branch: refs/heads/branch-2.0
Commit: 9e91a1009e6f916245b4d4018de1664ea3decfe7
Parents: 5bc5b49
Author: Dhruve Ashar 
Authored: Fri Sep 23 14:59:53 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Sep 23 14:59:53 2016 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +--
 .../apache/spark/internal/config/package.scala  |  5 
 .../spark/scheduler/LiveListenerBus.scala   | 23 +--
 .../scheduler/EventLoggingListenerSuite.scala   |  4 +--
 .../spark/scheduler/SparkListenerSuite.scala| 30 +++-
 .../storage/BlockManagerReplicationSuite.scala  |  9 --
 .../spark/storage/BlockManagerSuite.scala   |  6 ++--
 .../spark/ui/storage/StorageTabSuite.scala  | 11 +++
 .../streaming/ReceivedBlockHandlerSuite.scala   |  5 +++-
 9 files changed, 60 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 251c16f..ffd1227 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   def isStopped: Boolean = stopped.get()
 
   // An asynchronous listener bus for Spark events
-  private[spark] val listenerBus = new LiveListenerBus
+  private[spark] val listenerBus = new LiveListenerBus(this)
 
   // This function allows components created by SparkEnv to be mocked in unit 
tests:
   private[spark] def createSparkEnv(
@@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 }
 }
 
-listenerBus.start(this)
+listenerBus.start()
 _listenerBusStarted = true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index f28a9a5..29f812a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -112,4 +112,9 @@ package object config {
   // To limit how many applications are shown in the History Server summary ui
   private[spark] val HISTORY_UI_MAX_APPS =
 
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
+ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+  .intConf
+  .createWithDefault(1)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 1c21313..bfa3c40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.util.DynamicVariable
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. 
This listener bus
  * is stopped when `stop()` is called, and it will drop further events after 
stopping.
  */
-private[spark] class LiveListenerBus extends SparkListenerBus {
+private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends 

[1/2] spark git commit: Preparing Spark release v2.0.1-rc3

2016-09-23 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b111a81f2 -> 5bc5b49fa


Preparing Spark release v2.0.1-rc3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d28cc10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d28cc10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d28cc10

Branch: refs/heads/branch-2.0
Commit: 9d28cc10357a8afcfb2fa2e6eecb5c2cc2730d17
Parents: b111a81
Author: Patrick Wendell 
Authored: Fri Sep 23 14:38:07 2016 -0700
Committer: Patrick Wendell 
Committed: Fri Sep 23 14:38:07 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 35 files changed, 35 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 5a83883..3e49eac 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.0
+Version: 2.0.1
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ca6daa2..6db3a59 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index c727f54..269b845 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index e335a89..20cf29e 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 8e64f56..25cc328 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2-SNAPSHOT
+2.0.1
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d28cc10/common/sketch/pom.xml

[spark] Git Push Summary

2016-09-23 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v2.0.1-rc3 [created] 9d28cc103

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: Preparing development version 2.0.2-SNAPSHOT

2016-09-23 Thread pwendell
Preparing development version 2.0.2-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5bc5b49f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5bc5b49f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5bc5b49f

Branch: refs/heads/branch-2.0
Commit: 5bc5b49fa0a5f3d395457aceff268938317f3718
Parents: 9d28cc1
Author: Patrick Wendell 
Authored: Fri Sep 23 14:38:13 2016 -0700
Committer: Patrick Wendell 
Committed: Fri Sep 23 14:38:13 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 36 files changed, 37 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 3e49eac..dfb7e22 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.1
+Version: 2.0.2
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 6db3a59..ca6daa2 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 269b845..c727f54 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 20cf29e..e335a89 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 25cc328..8e64f56 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.1
+2.0.2-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5bc5b49f/common/sketch/pom.xml

spark git commit: [SPARK-17651][SPARKR] Set R package version number along with mvn

2016-09-23 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 452e468f2 -> b111a81f2


[SPARK-17651][SPARKR] Set R package version number along with mvn

This PR sets the R package version while tagging releases. Note that since R 
doesn't accept `-SNAPSHOT` in version number field, we remove that while 
setting the next version

Tested manually by running locally

Author: Shivaram Venkataraman 

Closes #15223 from shivaram/sparkr-version-change.

(cherry picked from commit 7c382524a959a2bc9b3d2fca44f6f0b41aba4e3c)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b111a81f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b111a81f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b111a81f

Branch: refs/heads/branch-2.0
Commit: b111a81f2a5547e2357d66db4ba2f05ce69a52a6
Parents: 452e468
Author: Shivaram Venkataraman 
Authored: Fri Sep 23 14:35:18 2016 -0700
Committer: Reynold Xin 
Committed: Fri Sep 23 14:36:01 2016 -0700

--
 dev/create-release/release-tag.sh | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b111a81f/dev/create-release/release-tag.sh
--
diff --git a/dev/create-release/release-tag.sh 
b/dev/create-release/release-tag.sh
index d404939..b7e5100 100755
--- a/dev/create-release/release-tag.sh
+++ b/dev/create-release/release-tag.sh
@@ -60,12 +60,27 @@ git config user.email $GIT_EMAIL
 
 # Create release version
 $MVN versions:set -DnewVersion=$RELEASE_VERSION | grep -v "no value" # silence 
logs
+# Set the release version in R/pkg/DESCRIPTION
+sed -i".tmp1" 's/Version.*$/Version: '"$RELEASE_VERSION"'/g' R/pkg/DESCRIPTION
+# Set the release version in docs
+sed -i".tmp1" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$RELEASE_VERSION"'/g' 
docs/_config.yml
+sed -i".tmp2" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: 
'"$RELEASE_VERSION"'/g' docs/_config.yml
+
 git commit -a -m "Preparing Spark release $RELEASE_TAG"
 echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH"
 git tag $RELEASE_TAG
 
 # Create next version
 $MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence 
logs
+# Remove -SNAPSHOT before setting the R version as R expects version strings 
to only have numbers
+R_NEXT_VERSION=`echo $NEXT_VERSION | sed 's/-SNAPSHOT//g'`
+sed -i".tmp2" 's/Version.*$/Version: '"$R_NEXT_VERSION"'/g' R/pkg/DESCRIPTION
+
+# Update docs with next version
+sed -i".tmp3" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$NEXT_VERSION"'/g' 
docs/_config.yml
+# Use R version for short version
+sed -i".tmp4" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: 
'"$R_NEXT_VERSION"'/g' docs/_config.yml
+
 git commit -a -m "Preparing development version $NEXT_VERSION"
 
 # Push changes


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17651][SPARKR] Set R package version number along with mvn

2016-09-23 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 90a30f463 -> 7c382524a


[SPARK-17651][SPARKR] Set R package version number along with mvn

## What changes were proposed in this pull request?

This PR sets the R package version while tagging releases. Note that since R 
doesn't accept `-SNAPSHOT` in version number field, we remove that while 
setting the next version

## How was this patch tested?

Tested manually by running locally

Author: Shivaram Venkataraman 

Closes #15223 from shivaram/sparkr-version-change.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c382524
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c382524
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c382524

Branch: refs/heads/master
Commit: 7c382524a959a2bc9b3d2fca44f6f0b41aba4e3c
Parents: 90a30f4
Author: Shivaram Venkataraman 
Authored: Fri Sep 23 14:35:18 2016 -0700
Committer: Reynold Xin 
Committed: Fri Sep 23 14:35:18 2016 -0700

--
 dev/create-release/release-tag.sh | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c382524/dev/create-release/release-tag.sh
--
diff --git a/dev/create-release/release-tag.sh 
b/dev/create-release/release-tag.sh
index d404939..b7e5100 100755
--- a/dev/create-release/release-tag.sh
+++ b/dev/create-release/release-tag.sh
@@ -60,12 +60,27 @@ git config user.email $GIT_EMAIL
 
 # Create release version
 $MVN versions:set -DnewVersion=$RELEASE_VERSION | grep -v "no value" # silence 
logs
+# Set the release version in R/pkg/DESCRIPTION
+sed -i".tmp1" 's/Version.*$/Version: '"$RELEASE_VERSION"'/g' R/pkg/DESCRIPTION
+# Set the release version in docs
+sed -i".tmp1" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$RELEASE_VERSION"'/g' 
docs/_config.yml
+sed -i".tmp2" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: 
'"$RELEASE_VERSION"'/g' docs/_config.yml
+
 git commit -a -m "Preparing Spark release $RELEASE_TAG"
 echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH"
 git tag $RELEASE_TAG
 
 # Create next version
 $MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence 
logs
+# Remove -SNAPSHOT before setting the R version as R expects version strings 
to only have numbers
+R_NEXT_VERSION=`echo $NEXT_VERSION | sed 's/-SNAPSHOT//g'`
+sed -i".tmp2" 's/Version.*$/Version: '"$R_NEXT_VERSION"'/g' R/pkg/DESCRIPTION
+
+# Update docs with next version
+sed -i".tmp3" 's/SPARK_VERSION:.*$/SPARK_VERSION: '"$NEXT_VERSION"'/g' 
docs/_config.yml
+# Use R version for short version
+sed -i".tmp4" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: 
'"$R_NEXT_VERSION"'/g' docs/_config.yml
+
 git commit -a -m "Preparing development version $NEXT_VERSION"
 
 # Push changes


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [SPARK-12221] add cpu time to metrics

2016-09-23 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 988c71457 -> 90a30f463


http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 00314ab..d5146d7 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -606,6 +606,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
 
   private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
 assert(metrics1.executorDeserializeTime === 
metrics2.executorDeserializeTime)
+assert(metrics1.executorDeserializeCpuTime === 
metrics2.executorDeserializeCpuTime)
+assert(metrics1.executorRunTime === metrics2.executorRunTime)
+assert(metrics1.executorCpuTime === metrics2.executorCpuTime)
 assert(metrics1.resultSize === metrics2.resultSize)
 assert(metrics1.jvmGCTime === metrics2.jvmGCTime)
 assert(metrics1.resultSerializationTime === 
metrics2.resultSerializationTime)
@@ -816,8 +819,11 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
   hasOutput: Boolean,
   hasRecords: Boolean = true) = {
 val t = TaskMetrics.empty
+// Set CPU times same as wall times for testing purpose
 t.setExecutorDeserializeTime(a)
+t.setExecutorDeserializeCpuTime(a)
 t.setExecutorRunTime(b)
+t.setExecutorCpuTime(b)
 t.setResultSize(c)
 t.setJvmGCTime(d)
 t.setResultSerializationTime(a + b)
@@ -1097,7 +1103,9 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  },
   |  "Task Metrics": {
   |"Executor Deserialize Time": 300,
+  |"Executor Deserialize CPU Time": 300,
   |"Executor Run Time": 400,
+  |"Executor CPU Time": 400,
   |"Result Size": 500,
   |"JVM GC Time": 600,
   |"Result Serialization Time": 700,
@@ -1195,7 +1203,9 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  },
   |  "Task Metrics": {
   |"Executor Deserialize Time": 300,
+  |"Executor Deserialize CPU Time": 300,
   |"Executor Run Time": 400,
+  |"Executor CPU Time": 400,
   |"Result Size": 500,
   |"JVM GC Time": 600,
   |"Result Serialization Time": 700,
@@ -1293,7 +1303,9 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |  },
   |  "Task Metrics": {
   |"Executor Deserialize Time": 300,
+  |"Executor Deserialize CPU Time": 300,
   |"Executor Run Time": 400,
+  |"Executor CPU Time": 400,
   |"Result Size": 500,
   |"JVM GC Time": 600,
   |"Result Serialization Time": 700,
@@ -1785,55 +1797,70 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |},
   |{
   |  "ID": 1,
+  |  "Name": "$EXECUTOR_DESERIALIZE_CPU_TIME",
+  |  "Update": 300,
+  |  "Internal": true,
+  |  "Count Failed Values": true
+  |},
+  |
+  |{
+  |  "ID": 2,
   |  "Name": "$EXECUTOR_RUN_TIME",
   |  "Update": 400,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 2,
+  |  "ID": 3,
+  |  "Name": "$EXECUTOR_CPU_TIME",
+  |  "Update": 400,
+  |  "Internal": true,
+  |  "Count Failed Values": true
+  |},
+  |{
+  |  "ID": 4,
   |  "Name": "$RESULT_SIZE",
   |  "Update": 500,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 3,
+  |  "ID": 5,
   |  "Name": "$JVM_GC_TIME",
   |  "Update": 600,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 4,
+  |  "ID": 6,
   |  "Name": "$RESULT_SERIALIZATION_TIME",
   |  "Update": 700,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 5,
+  |  "ID": 7,
   |  "Name": "$MEMORY_BYTES_SPILLED",
   |  "Update": 800,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 6,
+  |  "ID": 8,
   |  "Name": "$DISK_BYTES_SPILLED",
   |  "Update": 0,
   |  "Internal": true,
   |  "Count Failed Values": 

[2/2] spark git commit: [SPARK-12221] add cpu time to metrics

2016-09-23 Thread vanzin
[SPARK-12221] add cpu time to metrics

Currently task metrics don't support executor CPU time, so there's no way to 
calculate how much CPU time a stage/task took from History Server metrics. This 
PR enables reporting CPU time.

Author: jisookim 

Closes #10212 from jisookim0513/add-cpu-time-metric.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90a30f46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90a30f46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90a30f46

Branch: refs/heads/master
Commit: 90a30f46349182b6fc9d4123090c4712fdb425be
Parents: 988c714
Author: jisookim 
Authored: Fri Sep 23 13:43:47 2016 -0700
Committer: Marcelo Vanzin 
Committed: Fri Sep 23 13:43:47 2016 -0700

--
 .../org/apache/spark/InternalAccumulator.scala  |   2 +
 .../org/apache/spark/executor/Executor.scala|  15 +++
 .../org/apache/spark/executor/TaskMetrics.scala |  18 
 .../org/apache/spark/scheduler/ResultTask.scala |   8 ++
 .../apache/spark/scheduler/ShuffleMapTask.scala |   8 ++
 .../scala/org/apache/spark/scheduler/Task.scala |   2 +
 .../spark/status/api/v1/AllStagesResource.scala |   5 +
 .../org/apache/spark/status/api/v1/api.scala|   5 +
 .../spark/ui/jobs/JobProgressListener.scala |   4 +
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   5 +
 .../org/apache/spark/util/JsonProtocol.scala|  10 ++
 .../complete_stage_list_json_expectation.json   |   3 +
 .../failed_stage_list_json_expectation.json |   1 +
 .../one_stage_attempt_json_expectation.json |  17 
 .../one_stage_json_expectation.json |  17 
 .../stage_list_json_expectation.json|   4 +
 ..._list_with_accumulable_json_expectation.json |   1 +
 .../stage_task_list_expectation.json|  40 
 ...m_multi_attempt_app_json_1__expectation.json |  16 +++
 ...m_multi_attempt_app_json_2__expectation.json |  16 +++
 ...ask_list_w__offset___length_expectation.json | 100 +++
 .../stage_task_list_w__sortBy_expectation.json  |  40 
 ...ortBy_short_names___runtime_expectation.json |  40 
 ...sortBy_short_names__runtime_expectation.json |  40 
 ...summary_w__custom_quantiles_expectation.json |   2 +
 ...task_summary_w_shuffle_read_expectation.json |   2 +
 ...ask_summary_w_shuffle_write_expectation.json |   2 +
 ...stage_with_accumulable_json_expectation.json |  17 
 .../apache/spark/util/JsonProtocolSuite.scala   |  69 +
 project/MimaExcludes.scala  |   4 +
 30 files changed, 492 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
--
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala 
b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 0b494c1..82d3098 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -31,7 +31,9 @@ private[spark] object InternalAccumulator {
 
   // Names of internal task level metrics
   val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
+  val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + 
"executorDeserializeCpuTime"
   val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
+  val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime"
   val RESULT_SIZE = METRICS_PREFIX + "resultSize"
   val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
   val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"

http://git-wip-us.apache.org/repos/asf/spark/blob/90a30f46/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 668ec41..9501dd9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -232,13 +232,18 @@ private[spark] class Executor(
 }
 
 override def run(): Unit = {
+  val threadMXBean = ManagementFactory.getThreadMXBean
   val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
   val deserializeStartTime = System.currentTimeMillis()
+  val deserializeStartCpuTime = if 
(threadMXBean.isCurrentThreadCpuTimeSupported) {
+threadMXBean.getCurrentThreadCpuTime
+  } else 0L
   Thread.currentThread.setContextClassLoader(replClassLoader)
   val ser = env.closureSerializer.newInstance()
   

spark git commit: [SPARK-17577][CORE][2.0 BACKPORT] Update SparkContext.addFile to make it work well on Windows

2016-09-23 Thread sarutak
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1a8ea000e -> 452e468f2


[SPARK-17577][CORE][2.0 BACKPORT] Update SparkContext.addFile to make it work 
well on Windows

## What changes were proposed in this pull request?
Update ```SparkContext.addFile``` to correct the use of ```URI``` and 
```Path```, then it can work well on Windows. This is used for branch-2.0 
backport, more details at #15131.

## How was this patch tested?
Backport, checked by appveyor.

Author: Yanbo Liang 

Closes #15217 from yanboliang/uri-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/452e468f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/452e468f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/452e468f

Branch: refs/heads/branch-2.0
Commit: 452e468f280d69c930782a7588a87a816cc9585a
Parents: 1a8ea00
Author: Yanbo Liang 
Authored: Sat Sep 24 04:50:22 2016 +0900
Committer: Kousuke Saruta 
Committed: Sat Sep 24 04:50:22 2016 +0900

--
 core/src/main/scala/org/apache/spark/SparkContext.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/452e468f/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 214758f..251c16f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1419,7 +1419,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
* supported for Hadoop-supported filesystems.
*/
   def addFile(path: String, recursive: Boolean): Unit = {
-val uri = new URI(path)
+val uri = new Path(path).toUri
 val schemeCorrectedPath = uri.getScheme match {
   case null | "local" => new File(path).getCanonicalFile.toURI.toString
   case _ => path
@@ -1453,8 +1453,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   logInfo(s"Added file $path at $key with timestamp $timestamp")
   // Fetch the file locally so that closures which are run on the driver 
can still use the
   // SparkFiles API to access files.
-  Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, 
env.securityManager,
-hadoopConfiguration, timestamp, useCache = false)
+  Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), 
conf,
+env.securityManager, hadoopConfiguration, timestamp, useCache = false)
   postEnvironmentUpdate()
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17643] Remove comparable requirement from Offset

2016-09-23 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master f62ddc598 -> 988c71457


[SPARK-17643] Remove comparable requirement from Offset

For some sources, it is difficult to provide a global ordering based only on 
the data in the offset.  Since we don't use comparison for correctness, lets 
remove it.

Author: Michael Armbrust 

Closes #15207 from marmbrus/removeComparable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/988c7145
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/988c7145
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/988c7145

Branch: refs/heads/master
Commit: 988c71457354b0a443471f501cef544a85b1a76a
Parents: f62ddc5
Author: Michael Armbrust 
Authored: Fri Sep 23 12:17:59 2016 -0700
Committer: Tathagata Das 
Committed: Fri Sep 23 12:17:59 2016 -0700

--
 .../execution/streaming/CompositeOffset.scala   | 30 ---
 .../sql/execution/streaming/LongOffset.scala|  6 ---
 .../spark/sql/execution/streaming/Offset.scala  | 19 ++
 .../execution/streaming/StreamExecution.scala   |  9 +++--
 .../spark/sql/streaming/OffsetSuite.scala   | 39 
 5 files changed, 9 insertions(+), 94 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index 729c846..ebc6ee8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -24,36 +24,6 @@ package org.apache.spark.sql.execution.streaming
  */
 case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
   /**
-   * Returns a negative integer, zero, or a positive integer as this object is 
less than, equal to,
-   * or greater than the specified object.
-   */
-  override def compareTo(other: Offset): Int = other match {
-case otherComposite: CompositeOffset if otherComposite.offsets.size == 
offsets.size =>
-  val comparisons = offsets.zip(otherComposite.offsets).map {
-case (Some(a), Some(b)) => a compareTo b
-case (None, None) => 0
-case (None, _) => -1
-case (_, None) => 1
-  }
-  val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
-  nonZeroSigns.size match {
-case 0 => 0   // if both empty or only 0s
-case 1 => nonZeroSigns.head   // if there are only (0s and 1s) or 
(0s and -1s)
-case _ => // there are both 1s and -1s
-  throw new IllegalArgumentException(
-s"Invalid comparison between non-linear histories: $this <=> 
$other")
-  }
-case _ =>
-  throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
-  }
-
-  private def sign(num: Int): Int = num match {
-case i if i < 0 => -1
-case i if i == 0 => 0
-case i if i > 0 => 1
-  }
-
-  /**
* Unpacks an offset into [[StreamProgress]] by associating each offset with 
the order list of
* sources.
*

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index bb17640..c5e8827 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming
  */
 case class LongOffset(offset: Long) extends Offset {
 
-  override def compareTo(other: Offset): Int = other match {
-case l: LongOffset => offset.compareTo(l.offset)
-case _ =>
-  throw new IllegalArgumentException(s"Invalid comparison of $getClass 
with ${other.getClass}")
-  }
-
   def +(increment: Long): LongOffset = new LongOffset(offset + increment)
   def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/988c7145/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
--
diff --git 

spark git commit: [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio

2016-09-23 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d3f90e71a -> 1a8ea000e


[SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running 
sparkr in RStudio

## What changes were proposed in this pull request?

Spark will add sparkr.zip to archive only when it is yarn mode 
(SparkSubmit.scala).
```
if (args.isR && clusterManager == YARN) {
  val sparkRPackagePath = RUtils.localSparkRPackagePath
  if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN 
mode.")
  }
  val sparkRPackageFile = new File(sparkRPackagePath.get, 
SPARKR_PACKAGE_ARCHIVE)
  if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R 
application in YARN mode.")
  }
  val sparkRPackageURI = 
Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

  // Distribute the SparkR package.
  // Assigns a symbol link name "sparkr" to the shipped package.
  args.archives = mergeFileLists(args.archives, sparkRPackageURI + 
"#sparkr")

  // Distribute the R package archive containing all the built R packages.
  if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
  RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), 
R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
  printErrorAndExit("Failed to zip all the built R packages.")
}

val rPackageURI = 
Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
  }
}
```
So it is necessary to pass spark.master from R process to JVM. Otherwise 
sparkr.zip won't be distributed to executor.  Besides that I also pass 
spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need 
them to access secured cluster.

## How was this patch tested?

Verify it manually in R Studio using the following code.
```
Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark")
.libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session(master="yarn-client", sparkConfig = 
list(spark.executor.instances="1"))
df <- as.DataFrame(mtcars)
head(df)

```

…

Author: Jeff Zhang 

Closes #14784 from zjffdu/SPARK-17210.

(cherry picked from commit f62ddc5983a08d4d54c0a9a8210dd6cbec555671)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a8ea000
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a8ea000
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a8ea000

Branch: refs/heads/branch-2.0
Commit: 1a8ea000e7e16bdee54c47ab0f5e197c15f200a6
Parents: d3f90e7
Author: Jeff Zhang 
Authored: Fri Sep 23 11:37:43 2016 -0700
Committer: Felix Cheung 
Committed: Fri Sep 23 11:38:21 2016 -0700

--
 R/pkg/R/sparkR.R |  4 
 docs/sparkr.md   | 15 +++
 2 files changed, 19 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a8ea000/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 0601536..cc6d591 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -491,6 +491,10 @@ sparkConfToSubmitOps[["spark.driver.memory"]]   <- 
"--driver-memory"
 sparkConfToSubmitOps[["spark.driver.extraClassPath"]]   <- 
"--driver-class-path"
 sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- 
"--driver-java-options"
 sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- 
"--driver-library-path"
+sparkConfToSubmitOps[["spark.master"]] <- "--master"
+sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
+sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
+
 
 # Utility function that returns Spark Submit arguments as a string
 #

http://git-wip-us.apache.org/repos/asf/spark/blob/1a8ea000/docs/sparkr.md
--
diff --git a/docs/sparkr.md b/docs/sparkr.md
index b881119..340e7f7 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -63,6 +63,21 @@ The following Spark driver properties can be set in 
`sparkConfig` with `sparkR.s
 
   Property NameProperty 
groupspark-submit equivalent
   
+spark.master
+Application Properties
+--master
+  
+  
+spark.yarn.keytab
+Application Properties
+--keytab
+  
+  
+spark.yarn.principal
+Application Properties
+--principal
+  
+  
 spark.driver.memory
 Application Properties
 --driver-memory


-
To unsubscribe, e-mail: 

spark git commit: [SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running sparkr in RStudio

2016-09-23 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master f89808b0f -> f62ddc598


[SPARK-17210][SPARKR] sparkr.zip is not distributed to executors when running 
sparkr in RStudio

## What changes were proposed in this pull request?

Spark will add sparkr.zip to archive only when it is yarn mode 
(SparkSubmit.scala).
```
if (args.isR && clusterManager == YARN) {
  val sparkRPackagePath = RUtils.localSparkRPackagePath
  if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN 
mode.")
  }
  val sparkRPackageFile = new File(sparkRPackagePath.get, 
SPARKR_PACKAGE_ARCHIVE)
  if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R 
application in YARN mode.")
  }
  val sparkRPackageURI = 
Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

  // Distribute the SparkR package.
  // Assigns a symbol link name "sparkr" to the shipped package.
  args.archives = mergeFileLists(args.archives, sparkRPackageURI + 
"#sparkr")

  // Distribute the R package archive containing all the built R packages.
  if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
  RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), 
R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
  printErrorAndExit("Failed to zip all the built R packages.")
}

val rPackageURI = 
Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
  }
}
```
So it is necessary to pass spark.master from R process to JVM. Otherwise 
sparkr.zip won't be distributed to executor.  Besides that I also pass 
spark.yarn.keytab/spark.yarn.principal to spark side, because JVM process need 
them to access secured cluster.

## How was this patch tested?

Verify it manually in R Studio using the following code.
```
Sys.setenv(SPARK_HOME="/Users/jzhang/github/spark")
.libPaths(c(file.path(Sys.getenv(), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session(master="yarn-client", sparkConfig = 
list(spark.executor.instances="1"))
df <- as.DataFrame(mtcars)
head(df)

```

…

Author: Jeff Zhang 

Closes #14784 from zjffdu/SPARK-17210.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f62ddc59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f62ddc59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f62ddc59

Branch: refs/heads/master
Commit: f62ddc5983a08d4d54c0a9a8210dd6cbec555671
Parents: f89808b
Author: Jeff Zhang 
Authored: Fri Sep 23 11:37:43 2016 -0700
Committer: Felix Cheung 
Committed: Fri Sep 23 11:37:43 2016 -0700

--
 R/pkg/R/sparkR.R |  4 
 docs/sparkr.md   | 15 +++
 2 files changed, 19 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f62ddc59/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 0601536..cc6d591 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -491,6 +491,10 @@ sparkConfToSubmitOps[["spark.driver.memory"]]   <- 
"--driver-memory"
 sparkConfToSubmitOps[["spark.driver.extraClassPath"]]   <- 
"--driver-class-path"
 sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- 
"--driver-java-options"
 sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- 
"--driver-library-path"
+sparkConfToSubmitOps[["spark.master"]] <- "--master"
+sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
+sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
+
 
 # Utility function that returns Spark Submit arguments as a string
 #

http://git-wip-us.apache.org/repos/asf/spark/blob/f62ddc59/docs/sparkr.md
--
diff --git a/docs/sparkr.md b/docs/sparkr.md
index b881119..340e7f7 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -63,6 +63,21 @@ The following Spark driver properties can be set in 
`sparkConfig` with `sparkR.s
 
   Property NameProperty 
groupspark-submit equivalent
   
+spark.master
+Application Properties
+--master
+  
+  
+spark.yarn.keytab
+Application Properties
+--keytab
+  
+  
+spark.yarn.principal
+Application Properties
+--principal
+  
+  
 spark.driver.memory
 Application Properties
 --driver-memory


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17499][SPARKR][ML][MLLIB] make the default params in sparkR spark.mlp consistent with MultilayerPerceptronClassifier

2016-09-23 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 90d575421 -> f89808b0f


[SPARK-17499][SPARKR][ML][MLLIB] make the default params in sparkR spark.mlp 
consistent with MultilayerPerceptronClassifier

## What changes were proposed in this pull request?

update `MultilayerPerceptronClassifierWrapper.fit` paramter type:
`layers: Array[Int]`
`seed: String`

update several default params in sparkR `spark.mlp`:
`tol` --> 1e-6
`stepSize` --> 0.03
`seed` --> NULL ( when seed == NULL, the scala-side wrapper regard it as a 
`null` value and the seed will use the default one )
r-side `seed` only support 32bit integer.

remove `layers` default value, and move it in front of those parameters with 
default value.
add `layers` parameter validation check.

## How was this patch tested?

tests added.

Author: WeichenXu 

Closes #15051 from WeichenXu123/update_py_mlp_default.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f89808b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f89808b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f89808b0

Branch: refs/heads/master
Commit: f89808b0fdbc04e1bdff1489a6ec4c84ddb2adc4
Parents: 90d5754
Author: WeichenXu 
Authored: Fri Sep 23 11:14:22 2016 -0700
Committer: Felix Cheung 
Committed: Fri Sep 23 11:14:22 2016 -0700

--
 R/pkg/R/mllib.R  | 13 ++---
 R/pkg/inst/tests/testthat/test_mllib.R   | 19 +++
 .../MultilayerPerceptronClassifierWrapper.scala  |  8 
 3 files changed, 33 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f89808b0/R/pkg/R/mllib.R
--
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 98db367..971c166 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -694,12 +694,19 @@ setMethod("predict", signature(object = "KMeansModel"),
 #' }
 #' @note spark.mlp since 2.1.0
 setMethod("spark.mlp", signature(data = "SparkDataFrame"),
-  function(data, blockSize = 128, layers = c(3, 5, 2), solver = 
"l-bfgs", maxIter = 100,
-   tol = 0.5, stepSize = 1, seed = 1) {
+  function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 
100,
+   tol = 1E-6, stepSize = 0.03, seed = NULL) {
+layers <- as.integer(na.omit(layers))
+if (length(layers) <= 1) {
+  stop ("layers must be a integer vector with length > 1.")
+}
+if (!is.null(seed)) {
+  seed <- as.character(as.integer(seed))
+}
 jobj <- 
callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
 "fit", data@sdf, as.integer(blockSize), 
as.array(layers),
 as.character(solver), as.integer(maxIter), 
as.numeric(tol),
-as.numeric(stepSize), as.integer(seed))
+as.numeric(stepSize), seed)
 new("MultilayerPerceptronClassificationModel", jobj = jobj)
   })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f89808b0/R/pkg/inst/tests/testthat/test_mllib.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
b/R/pkg/inst/tests/testthat/test_mllib.R
index 24c40a8..a1eaaf2 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -391,6 +391,25 @@ test_that("spark.mlp", {
 
   unlink(modelPath)
 
+  # Test default parameter
+  model <- spark.mlp(df, layers = c(4, 5, 4, 3))
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 10), c(1, 1, 1, 1, 0, 1, 2, 2, 
1, 0))
+
+  # Test illegal parameter
+  expect_error(spark.mlp(df, layers = NULL), "layers must be a integer vector 
with length > 1.")
+  expect_error(spark.mlp(df, layers = c()), "layers must be a integer vector 
with length > 1.")
+  expect_error(spark.mlp(df, layers = c(3)), "layers must be a integer vector 
with length > 1.")
+
+  # Test random seed
+  # default seed
+  model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10)
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 2, 2, 
1, 2, 0, 1))
+  # seed equals 10
+  model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 
1, 0, 0, 1))
 })
 
 test_that("spark.naiveBayes", {


spark git commit: [SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2

2016-09-23 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 5c5396cb4 -> 90d575421


[SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of 
Accumulator V2

## What changes were proposed in this pull request?

Move the internals of the PySpark accumulator API from the old deprecated API 
on top of the new accumulator API.

## How was this patch tested?

The existing PySpark accumulator tests (both unit tests and doc tests at the 
start of accumulator.py).

Author: Holden Karau 

Closes #14467 from holdenk/SPARK-16861-refactor-pyspark-accumulator-api.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d57542
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d57542
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d57542

Branch: refs/heads/master
Commit: 90d5754212425d55f992c939a2bc7d9ac6ef92b8
Parents: 5c5396c
Author: Holden Karau 
Authored: Fri Sep 23 09:44:30 2016 +0100
Committer: Sean Owen 
Committed: Fri Sep 23 09:44:30 2016 +0100

--
 .../org/apache/spark/api/python/PythonRDD.scala | 42 +++-
 python/pyspark/context.py   |  5 +--
 2 files changed, 25 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90d57542/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d841091..0ca91b9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.api.python
 import java.io._
 import java.net._
 import java.nio.charset.StandardCharsets
-import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => 
JMap}
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -38,7 +38,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util._
 
 
 private[spark] class PythonRDD(
@@ -75,7 +75,7 @@ private[spark] case class PythonFunction(
 pythonExec: String,
 pythonVer: String,
 broadcastVars: JList[Broadcast[PythonBroadcast]],
-accumulator: Accumulator[JList[Array[Byte]]])
+accumulator: PythonAccumulatorV2)
 
 /**
  * A wrapper for chained Python functions (from bottom to top).
@@ -200,7 +200,7 @@ private[spark] class PythonRunner(
 val updateLen = stream.readInt()
 val update = new Array[Byte](updateLen)
 stream.readFully(update)
-accumulator += Collections.singletonList(update)
+accumulator.add(update)
   }
   // Check whether the worker is ready to be re-used.
   if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
@@ -461,7 +461,7 @@ private[spark] object PythonRDD extends Logging {
   JavaRDD[Array[Byte]] = {
 val file = new DataInputStream(new FileInputStream(filename))
 try {
-  val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
+  val objs = new mutable.ArrayBuffer[Array[Byte]]
   try {
 while (true) {
   val length = file.readInt()
@@ -866,11 +866,13 @@ class BytesToString extends 
org.apache.spark.api.java.function.Function[Array[By
 }
 
 /**
- * Internal class that acts as an `AccumulatorParam` for Python accumulators. 
Inside, it
+ * Internal class that acts as an `AccumulatorV2` for Python accumulators. 
Inside, it
  * collects a list of pickled strings that we pass to Python through a socket.
  */
-private class PythonAccumulatorParam(@transient private val serverHost: 
String, serverPort: Int)
-  extends AccumulatorParam[JList[Array[Byte]]] {
+private[spark] class PythonAccumulatorV2(
+@transient private val serverHost: String,
+private val serverPort: Int)
+  extends CollectionAccumulator[Array[Byte]] {
 
   Utils.checkHost(serverHost, "Expected hostname")
 
@@ -880,30 +882,33 @@ private class PythonAccumulatorParam(@transient private 
val serverHost: String,
* We try to reuse a single Socket to transfer accumulator updates, as they 
are all added
* by the DAGScheduler's single-threaded RpcEndpoint anyway.
*/
-  @transient var socket: Socket = _
+  @transient private var socket: Socket = _
 
-  def openSocket(): Socket = synchronized {
+  private def openSocket(): Socket = 

spark git commit: [BUILD] Closes some stale PRs

2016-09-23 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 62ccf27ab -> 5c5396cb4


[BUILD] Closes some stale PRs

## What changes were proposed in this pull request?

This PR proposes to close some stale PRs and ones suggested to be closed by 
committer(s)

Closes #12415
Closes #14765
Closes #15118
Closes #15184
Closes #15183
Closes #9440
Closes #15023
Closes #14643
Closes #14827

## How was this patch tested?

N/A

Author: hyukjinkwon 

Closes #15198 from HyukjinKwon/stale-prs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c5396cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c5396cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c5396cb

Branch: refs/heads/master
Commit: 5c5396cb4725ba5ceee26ed885e8b941d219757b
Parents: 62ccf27
Author: hyukjinkwon 
Authored: Fri Sep 23 09:41:50 2016 +0100
Committer: Sean Owen 
Committed: Fri Sep 23 09:41:50 2016 +0100

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry

2016-09-23 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 54d4eee51 -> d3f90e71a


[SPARK-17640][SQL] Avoid using -1 as the default batchId for 
FileStreamSource.FileEntry

## What changes were proposed in this pull request?

Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we 
can make sure not writing any FileEntry(..., batchId = -1) into the log. This 
also avoids people misusing it in future (#15203 is an example).

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu 

Closes #15206 from zsxwing/cleanup.

(cherry picked from commit 62ccf27ab4b55e734646678ae78b7e812262d14b)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3f90e71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3f90e71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3f90e71

Branch: refs/heads/branch-2.0
Commit: d3f90e71af57162afc0648adbc52b810a883ceac
Parents: 54d4eee
Author: Shixiong Zhu 
Authored: Thu Sep 22 23:35:08 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 22 23:35:15 2016 -0700

--
 .../execution/streaming/FileStreamSource.scala  | 37 ++--
 .../streaming/FileStreamSourceSuite.scala   | 24 ++---
 2 files changed, 31 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3f90e71/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 4515f9a..8c3e718 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -59,7 +59,7 @@ class FileStreamSource(
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
 
   metadataLog.allFiles().foreach { entry =>
-seenFiles.add(entry)
+seenFiles.add(entry.path, entry.timestamp)
   }
   seenFiles.purge()
 
@@ -73,14 +73,16 @@ class FileStreamSource(
*/
   private def fetchMaxOffset(): LongOffset = synchronized {
 // All the new files found - ignore aged files and files that we have seen.
-val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+val newFiles = fetchAllFiles().filter {
+  case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
+}
 
 // Obey user's setting to limit the number of files in this batch trigger.
 val batchFiles =
   if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else 
newFiles
 
 batchFiles.foreach { file =>
-  seenFiles.add(file)
+  seenFiles.add(file._1, file._2)
   logDebug(s"New file: $file")
 }
 val numPurged = seenFiles.purge()
@@ -95,7 +97,9 @@ class FileStreamSource(
 
 if (batchFiles.nonEmpty) {
   maxBatchId += 1
-  metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = 
maxBatchId)).toArray)
+  metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
+FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+  }.toArray)
   logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} 
new files")
 }
 
@@ -140,12 +144,12 @@ class FileStreamSource(
   /**
* Returns a list of files found, sorted by their timestamp.
*/
-  private def fetchAllFiles(): Seq[FileEntry] = {
+  private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
 val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
 val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
 val files = catalog.allFiles().sortBy(_.getModificationTime).map { status 
=>
-  FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+  (status.getPath.toUri.toString, status.getModificationTime)
 }
 val endTime = System.nanoTime
 val listingTimeMs = (endTime.toDouble - startTime) / 100
@@ -172,10 +176,7 @@ object FileStreamSource {
   /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
   type Timestamp = Long
 
-  val NOT_SET = -1L
-
-  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = 
NOT_SET)
-extends Serializable
+  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) 
extends Serializable
 
   /**
* A custom hash map used to track the list of files seen. This map is not 
thread-safe.
@@ -196,10 +197,10 @@ object 

spark git commit: [SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStreamSource.FileEntry

2016-09-23 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 947b8c6e3 -> 62ccf27ab


[SPARK-17640][SQL] Avoid using -1 as the default batchId for 
FileStreamSource.FileEntry

## What changes were proposed in this pull request?

Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we 
can make sure not writing any FileEntry(..., batchId = -1) into the log. This 
also avoids people misusing it in future (#15203 is an example).

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu 

Closes #15206 from zsxwing/cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62ccf27a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62ccf27a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62ccf27a

Branch: refs/heads/master
Commit: 62ccf27ab4b55e734646678ae78b7e812262d14b
Parents: 947b8c6
Author: Shixiong Zhu 
Authored: Thu Sep 22 23:35:08 2016 -0700
Committer: Shixiong Zhu 
Committed: Thu Sep 22 23:35:08 2016 -0700

--
 .../execution/streaming/FileStreamSource.scala  | 37 ++--
 .../streaming/FileStreamSourceSuite.scala   | 24 ++---
 2 files changed, 31 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62ccf27a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 5ebc083..be02327 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -59,7 +59,7 @@ class FileStreamSource(
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
 
   metadataLog.allFiles().foreach { entry =>
-seenFiles.add(entry)
+seenFiles.add(entry.path, entry.timestamp)
   }
   seenFiles.purge()
 
@@ -73,14 +73,16 @@ class FileStreamSource(
*/
   private def fetchMaxOffset(): LongOffset = synchronized {
 // All the new files found - ignore aged files and files that we have seen.
-val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+val newFiles = fetchAllFiles().filter {
+  case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
+}
 
 // Obey user's setting to limit the number of files in this batch trigger.
 val batchFiles =
   if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else 
newFiles
 
 batchFiles.foreach { file =>
-  seenFiles.add(file)
+  seenFiles.add(file._1, file._2)
   logDebug(s"New file: $file")
 }
 val numPurged = seenFiles.purge()
@@ -95,7 +97,9 @@ class FileStreamSource(
 
 if (batchFiles.nonEmpty) {
   maxBatchId += 1
-  metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = 
maxBatchId)).toArray)
+  metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
+FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+  }.toArray)
   logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} 
new files")
 }
 
@@ -140,12 +144,12 @@ class FileStreamSource(
   /**
* Returns a list of files found, sorted by their timestamp.
*/
-  private def fetchAllFiles(): Seq[FileEntry] = {
+  private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
 val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
 val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
 val files = catalog.allFiles().sortBy(_.getModificationTime).map { status 
=>
-  FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+  (status.getPath.toUri.toString, status.getModificationTime)
 }
 val endTime = System.nanoTime
 val listingTimeMs = (endTime.toDouble - startTime) / 100
@@ -172,10 +176,7 @@ object FileStreamSource {
   /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
   type Timestamp = Long
 
-  val NOT_SET = -1L
-
-  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = 
NOT_SET)
-extends Serializable
+  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) 
extends Serializable
 
   /**
* A custom hash map used to track the list of files seen. This map is not 
thread-safe.
@@ -196,10 +197,10 @@ object FileStreamSource {
 private var lastPurgeTimestamp: Timestamp = 0L
 
 /** Add a new file to the map. */
-def add(file: