[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-02-02 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r568740939



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+private[python] trait PythonSQLMetrics extends SparkPlan {
+  override val metrics = Map(

Review comment:
   My original thought here was to use a trait so that we can reuse the 
metrics definitaion with the many physical plans that need it 
(BatchEvalPythonExec, ArrowEcalPythonExec, AggregateInPandasExec, etc). Other 
implementations of this can work too. I see that `ShuffleRowRDD` uses a custom 
object `SQLShuffleMetricsReporter` to define and handle the metrics. Can you 
please elaborate a bit more on your proposal?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-02-02 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r568740939



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonSQLMetrics.scala
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+
+private[python] trait PythonSQLMetrics extends SparkPlan {
+  override val metrics = Map(

Review comment:
   My original thought here was to use a trait so that we can reuse the 
metrics definitaion with the many physical plans that need it 
(BatchEvalPythonExec, ArrowEcalPythonExec, AggregateInPandasExec, etc). Other 
implementations of this can work too. I see that `ShuffleRowRDD` uses a custom 
object `SQLShuffleMetricsReporter` to define and handle the metrics. Can you 
please elaborate a bit more on your proposal?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-01-29 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566863847



##
File path: python/pyspark/sql/tests/test_pandas_sqlmetrics.py
##
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.sql.functions import pandas_udf
+from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
+pandas_requirement_message, pyarrow_requirement_message
+
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+pandas_requirement_message or pyarrow_requirement_message)  # type: 
ignore[arg-type]
+class PandasSQLMetrics(ReusedSQLTestCase):

Review comment:
   Good point. I am now sure on how to do a Scala side test for Python UDF 
though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-01-29 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566863847



##
File path: python/pyspark/sql/tests/test_pandas_sqlmetrics.py
##
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.sql.functions import pandas_udf
+from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, 
have_pyarrow, \
+pandas_requirement_message, pyarrow_requirement_message
+
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+pandas_requirement_message or pyarrow_requirement_message)  # type: 
ignore[arg-type]
+class PandasSQLMetrics(ReusedSQLTestCase):

Review comment:
   Good point. I am now sure on how to do that for Python UDF though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-01-29 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566734443



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala
##
@@ -71,10 +77,19 @@ private[python] trait PythonArrowOutput { self: 
BasePythonRunner[_, ColumnarBatc
 }
 try {
   if (reader != null && batchLoaded) {
+val bytesReadStart = reader.bytesRead()
+val startTime = System.nanoTime()
 batchLoaded = reader.loadNextBatch()
+val deltaTime = System.nanoTime() - startTime

Review comment:
   Good point. From what I can see by experimenting, is that this actually 
triggers execution, so the time measured here seems to be a relevant value for 
the execution time. Ideally we would like to measure on the Python side, 
although this seems good enough?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LucaCanali commented on a change in pull request #31367: [SPARK-34265][PYTHON][SQL] Instrument Python UDF using SQL Metrics

2021-01-29 Thread GitBox


LucaCanali commented on a change in pull request #31367:
URL: https://github.com/apache/spark/pull/31367#discussion_r566732268



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
##
@@ -83,12 +97,19 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
resultAttrs: Seq[Attribute]
   val unpickledBatch = unpickle.loads(pickedResult)
   unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
 }.map { result =>
+  pythonNumRowsReceived += 1
+  val startTime = System.nanoTime()
   if (udfs.length == 1) {
 // fast path for single UDF
 mutableRow(0) = fromJava(result)
+val deltaTime = System.nanoTime() - startTime
+pythonExecTime += deltaTime
 mutableRow
   } else {
-fromJava(result).asInstanceOf[InternalRow]
+val res = fromJava(result).asInstanceOf[InternalRow]
+val deltaTime = System.nanoTime() - startTime
+pythonExecTime += deltaTime

Review comment:
   Correct, this is just a component, actually small and could be omitted. 
The most important part of the execution for BatchEvalPythonExec time is 
measured in PythonUDFRunner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org