EnricoMi commented on a change in pull request #33545:
URL: https://github.com/apache/spark/pull/33545#discussion_r679141050



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *
-   * @return the observed metrics as a [[Row]]
+   * @return the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException interrupted while waiting
    */
   @throws[InterruptedException]
-  def get: Row = {
+  def get: Map[String, Any] = {
     synchronized {
       // we need to loop as wait might return without us calling notify
       // 
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
+      while (this.metrics.isEmpty) {
         wait()
       }
     }
 
-    this.row.get
+    this.metrics.get
+  }
+
+  /**
+   * (Java-specific)
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *

Review comment:
       applied

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *

Review comment:
       added

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala
##########
@@ -78,20 +82,35 @@ class Observation(name: String) {
    * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
    * Only the result of the first action is available. Subsequent actions do 
not modify the result.
    *
-   * @return the observed metrics as a [[Row]]
+   * @return the observed metrics as a `Map[String, Any]`
    * @throws InterruptedException interrupted while waiting
    */
   @throws[InterruptedException]
-  def get: Row = {
+  def get: Map[String, Any] = {
     synchronized {
       // we need to loop as wait might return without us calling notify
       // 
https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610
-      while (this.row.isEmpty) {
+      while (this.metrics.isEmpty) {
         wait()
       }
     }
 
-    this.row.get
+    this.metrics.get
+  }
+
+  /**
+   * (Java-specific)
+   * Get the observed metrics. This waits for the observed dataset to finish 
its first action.
+   * Only the result of the first action is available. Subsequent actions do 
not modify the result.
+   *
+   * @return the observed metrics as a `java.util.Map[String, Object]`
+   * @throws InterruptedException interrupted while waiting
+   */
+  @throws[InterruptedException]
+  def getAsJavaMap: java.util.Map[String, Object] = {

Review comment:
       changed

##########
File path: python/pyspark/sql/observation.py
##########
@@ -112,12 +115,13 @@ def get(self):
 
         Returns
         -------
-        :class:`Row`
+        :class:`Dict`
             the observed metrics
         """
         assert self._jo is not None, 'call DataFrame.observe'
-        jrow = self._jo.get()
-        return self._to_row(jrow)
+        jmap = self._jo.getAsJavaMap()

Review comment:
       done

##########
File path: 
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
##########
@@ -391,6 +390,60 @@ public void testGroupBy() {
     Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), 
toSet(cogrouped.collectAsList()));
   }
 
+  @Test
+  public void testObservation() {
+    // SPARK-34806: tests the Observation Java API and 
Dataset.observe(Observation, Column, Column*)
+    Observation namedObservation = new Observation("named");
+    Observation unnamedObservation = new Observation();
+
+    Dataset<Long> df = spark
+      .range(100)
+      .observe(
+        namedObservation,
+        min(col("id")).as("min_val"),
+        max(col("id")).as("max_val"),
+        sum(col("id")).as("sum_val"),
+        count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even")
+      )
+      .observe(
+        unnamedObservation,
+        avg(col("id")).cast("int").as("avg_val")
+      );
+
+    df.collect();
+    Map<String, Object> namedMetrics = null;
+    Map<String, Object> unnamedMetrics = null;
+
+    try {
+      namedMetrics = namedObservation.getAsJavaMap();
+      unnamedMetrics = unnamedObservation.getAsJavaMap();

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to