EnricoMi commented on a change in pull request #33545: URL: https://github.com/apache/spark/pull/33545#discussion_r679141050
########## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ########## @@ -78,20 +82,35 @@ class Observation(name: String) { * Get the observed metrics. This waits for the observed dataset to finish its first action. * Only the result of the first action is available. Subsequent actions do not modify the result. * - * @return the observed metrics as a [[Row]] + * @return the observed metrics as a `Map[String, Any]` * @throws InterruptedException interrupted while waiting */ @throws[InterruptedException] - def get: Row = { + def get: Map[String, Any] = { synchronized { // we need to loop as wait might return without us calling notify // https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610 - while (this.row.isEmpty) { + while (this.metrics.isEmpty) { wait() } } - this.row.get + this.metrics.get + } + + /** + * (Java-specific) + * Get the observed metrics. This waits for the observed dataset to finish its first action. + * Only the result of the first action is available. Subsequent actions do not modify the result. + * Review comment: applied ########## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ########## @@ -78,20 +82,35 @@ class Observation(name: String) { * Get the observed metrics. This waits for the observed dataset to finish its first action. * Only the result of the first action is available. Subsequent actions do not modify the result. * Review comment: added ########## File path: sql/core/src/main/scala/org/apache/spark/sql/Observation.scala ########## @@ -78,20 +82,35 @@ class Observation(name: String) { * Get the observed metrics. This waits for the observed dataset to finish its first action. * Only the result of the first action is available. Subsequent actions do not modify the result. * - * @return the observed metrics as a [[Row]] + * @return the observed metrics as a `Map[String, Any]` * @throws InterruptedException interrupted while waiting */ @throws[InterruptedException] - def get: Row = { + def get: Map[String, Any] = { synchronized { // we need to loop as wait might return without us calling notify // https://en.wikipedia.org/w/index.php?title=Spurious_wakeup&oldid=992601610 - while (this.row.isEmpty) { + while (this.metrics.isEmpty) { wait() } } - this.row.get + this.metrics.get + } + + /** + * (Java-specific) + * Get the observed metrics. This waits for the observed dataset to finish its first action. + * Only the result of the first action is available. Subsequent actions do not modify the result. + * + * @return the observed metrics as a `java.util.Map[String, Object]` + * @throws InterruptedException interrupted while waiting + */ + @throws[InterruptedException] + def getAsJavaMap: java.util.Map[String, Object] = { Review comment: changed ########## File path: python/pyspark/sql/observation.py ########## @@ -112,12 +115,13 @@ def get(self): Returns ------- - :class:`Row` + :class:`Dict` the observed metrics """ assert self._jo is not None, 'call DataFrame.observe' - jrow = self._jo.get() - return self._to_row(jrow) + jmap = self._jo.getAsJavaMap() Review comment: done ########## File path: sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java ########## @@ -391,6 +390,60 @@ public void testGroupBy() { Assert.assertEquals(asSet("1a#2", "3foobar#6", "5#10"), toSet(cogrouped.collectAsList())); } + @Test + public void testObservation() { + // SPARK-34806: tests the Observation Java API and Dataset.observe(Observation, Column, Column*) + Observation namedObservation = new Observation("named"); + Observation unnamedObservation = new Observation(); + + Dataset<Long> df = spark + .range(100) + .observe( + namedObservation, + min(col("id")).as("min_val"), + max(col("id")).as("max_val"), + sum(col("id")).as("sum_val"), + count(when(pmod(col("id"), lit(2)).$eq$eq$eq(0), 1)).as("num_even") + ) + .observe( + unnamedObservation, + avg(col("id")).cast("int").as("avg_val") + ); + + df.collect(); + Map<String, Object> namedMetrics = null; + Map<String, Object> unnamedMetrics = null; + + try { + namedMetrics = namedObservation.getAsJavaMap(); + unnamedMetrics = unnamedObservation.getAsJavaMap(); Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org