Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e9b3afac9 -> 1c6419718


[SPARK-18754][SS] Rename recentProgresses to recentProgress

Based on an informal survey, users find this option easier to understand / 
remember.

Author: Michael Armbrust <mich...@databricks.com>

Closes #16182 from marmbrus/renameRecentProgress.

(cherry picked from commit 70b2bf717d367d598c5a238d569d62c777e63fde)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c641971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c641971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c641971

Branch: refs/heads/branch-2.1
Commit: 1c6419718aadf0bdc200f9b328242062a07f2277
Parents: e9b3afa
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Dec 7 15:36:29 2016 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Dec 7 15:36:39 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala     |  2 +-
 project/MimaExcludes.scala                        |  2 +-
 python/pyspark/sql/streaming.py                   |  6 +++---
 python/pyspark/sql/tests.py                       |  4 ++--
 .../execution/streaming/ProgressReporter.scala    |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala   |  2 +-
 .../spark/sql/streaming/StreamingQuery.scala      |  4 ++--
 .../execution/streaming/ForeachSinkSuite.scala    |  4 ++--
 .../sql/streaming/FileStreamSourceSuite.scala     |  2 +-
 .../streaming/StreamingQueryListenerSuite.scala   |  4 ++--
 .../spark/sql/streaming/StreamingQuerySuite.scala | 18 +++++++++---------
 11 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 0e40aba..544fbc5 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -448,7 +448,7 @@ class KafkaSourceSuite extends KafkaSourceTest {
       AddKafkaData(Set(topic), 1, 2, 3),
       CheckAnswer(2, 3, 4),
       AssertOnQuery { query =>
-        val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+        val recordsRead = query.recentProgress.map(_.numInputRows).sum
         recordsRead == 3
       }
     )

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6650aad..978a328 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -85,7 +85,7 @@ object MimaExcludes {
       
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
       
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
-      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgress"),
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
       
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ee7a26d..9cfb3fe 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -114,12 +114,12 @@ class StreamingQuery(object):
 
     @property
     @since(2.1)
-    def recentProgresses(self):
+    def recentProgress(self):
         """Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
         The number of progress updates retained for each stream is configured 
by Spark session
-        configuration `spark.sql.streaming.numRecentProgresses`.
+        configuration `spark.sql.streaming.numRecentProgressUpdates`.
         """
-        return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+        return [json.loads(p.json()) for p in self._jsq.recentProgress()]
 
     @property
     @since(2.1)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 66a3490..50df68b 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1116,11 +1116,11 @@ class SQLTests(ReusedPySparkTestCase):
         try:
             q.processAllAvailable()
             lastProgress = q.lastProgress
-            recentProgresses = q.recentProgresses
+            recentProgress = q.recentProgress
             status = q.status
             self.assertEqual(lastProgress['name'], q.name)
             self.assertEqual(lastProgress['id'], q.id)
-            self.assertTrue(any(p == lastProgress for p in recentProgresses))
+            self.assertTrue(any(p == lastProgress for p in recentProgress))
             self.assertTrue(
                 "message" in status and
                 "isDataAvailable" in status and

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 12d0c1e..40e3151 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -94,7 +94,7 @@ trait ProgressReporter extends Logging {
   def status: StreamingQueryStatus = currentStatus
 
   /** Returns an array containing the most recent query progress updates. */
-  def recentProgresses: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
+  def recentProgress: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
     progressBuffer.toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 581f99e..0280a3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -591,7 +591,7 @@ object SQLConf {
       .createWithDefault(false)
 
   val STREAMING_PROGRESS_RETENTION =
-    SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
+    SQLConfigBuilder("spark.sql.streaming.numRecentProgressUpdates")
       .doc("The number of progress updates to retain for a streaming query")
       .intConf
       .createWithDefault(100)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 1794e75..596bd90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -87,11 +87,11 @@ trait StreamingQuery {
   /**
    * Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query.
    * The number of progress updates retained for each stream is configured by 
Spark session
-   * configuration `spark.sql.streaming.numRecentProgresses`.
+   * configuration `spark.sql.streaming.numRecentProgressUpdates`.
    *
    * @since 2.1.0
    */
-  def recentProgresses: Array[StreamingQueryProgress]
+  def recentProgress: Array[StreamingQueryProgress]
 
   /**
    * Returns the most recent [[StreamingQueryProgress]] update of this 
streaming query.

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 4a3eeb7..9137d65 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -263,9 +263,9 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
     try {
       inputData.addData(10, 11, 12)
       query.processAllAvailable()
-      val recentProgress = query.recentProgresses.filter(_.numInputRows != 
0).headOption
+      val recentProgress = query.recentProgress.filter(_.numInputRows != 
0).headOption
       assert(recentProgress.isDefined && recentProgress.get.numInputRows === 3,
-        s"recentProgresses[${query.recentProgresses.toList}] doesn't contain 
correct metrics")
+        s"recentProgress[${query.recentProgress.toList}] doesn't contain 
correct metrics")
     } finally {
       query.stop()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index ff1f3e2..7b6fe83 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,7 +1006,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         AddTextFileData("100", src, tmp),
         CheckAnswer("100"),
         AssertOnQuery { query =>
-          val actualProgress = query.recentProgresses
+          val actualProgress = query.recentProgress
               .find(_.numInputRows > 0)
               .getOrElse(sys.error("Could not find records with data."))
           assert(actualProgress.numInputRows === 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 1cd503c..b78d135 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -237,9 +237,9 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
           }
           true
         }
-        // `recentProgresses` should not receive too many no data events
+        // `recentProgress` should not receive too many no data events
         actions += AssertOnQuery { q =>
-          q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
+          q.recentProgress.size > 1 && q.recentProgress.size <= 11
         }
         testStream(input.toDS)(actions: _*)
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c641971/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 55dd1a5..7be2f21 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -152,7 +152,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     )
   }
 
-  testQuietly("status, lastProgress, and recentProgresses") {
+  testQuietly("status, lastProgress, and recentProgress") {
     import StreamingQuerySuite._
     clock = new StreamManualClock
 
@@ -201,7 +201,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
-      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+      AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while offset is being fetched
       AddData(inputData, 1, 2),
@@ -210,7 +210,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
-      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+      AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch is being fetched
       AdvanceManualClock(200), // time = 300 to unblock getOffset, will block 
on getBatch
@@ -218,14 +218,14 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
-      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+      AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch is being processed
       AdvanceManualClock(300), // time = 600 to unblock getBatch, will block 
in Spark job
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
-      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+      AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
       // Test status and progress while batch processing has completed
       AdvanceManualClock(500), // time = 1100 to unblock job
@@ -236,8 +236,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
       AssertOnQuery { query =>
         assert(query.lastProgress != null)
-        assert(query.recentProgresses.exists(_.numInputRows > 0))
-        assert(query.recentProgresses.last.eq(query.lastProgress))
+        assert(query.recentProgress.exists(_.numInputRows > 0))
+        assert(query.recentProgress.last.eq(query.lastProgress))
 
         val progress = query.lastProgress
         assert(progress.id === query.id)
@@ -274,7 +274,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       AssertOnQuery(_.status.isTriggerActive === false),
       AssertOnQuery(_.status.message === "Waiting for next trigger"),
       AssertOnQuery { query =>
-        assert(query.recentProgresses.last.eq(query.lastProgress))
+        assert(query.recentProgress.last.eq(query.lastProgress))
         assert(query.lastProgress.batchId === 1)
         assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
         true
@@ -408,7 +408,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     try {
       val q = 
streamingDF.writeStream.format("memory").queryName("test").start()
       q.processAllAvailable()
-      q.recentProgresses.head
+      q.recentProgress.head
     } finally {
       spark.streams.active.map(_.stop())
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to