Python doesn't allow abbreviating () with no param, whereas Scala does. Use `write()`, not `write`.
On Wed, Jul 29, 2020 at 9:09 AM muru <mmur...@gmail.com> wrote: > In a pyspark SS job, trying to use sql instead of sql functions in > foreachBatch sink > throws AttributeError: 'JavaMember' object has no attribute 'format' > exception. > However, the same thing works in Scala API. > > Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same > exception. > Is it a bug or known issue with Pyspark implementation? I noticed that I > could perform other operations except the write method. > > Please, let me know how to fix this issue. > > See below code examples > # Spark Scala method > def processData(batchDF: DataFrame, batchId: Long) { > batchDF.createOrReplaceTempView("tbl") > val outdf=batchDF.sparkSession.sql("select action, count(*) as count > from tbl where date='2020-06-20' group by 1") > outdf.printSchema() > outdf.show > outdf.coalesce(1).write.format("csv").save("/tmp/agg") > } > > ## pyspark python method > def process_data(bdf, bid): > lspark = bdf._jdf.sparkSession() > bdf.createOrReplaceTempView("tbl") > outdf=lspark.sql("select action, count(*) as count from tbl where > date='2020-06-20' group by 1") > outdf.printSchema() > # it works > outdf.show() > # throws AttributeError: 'JavaMember' object has no attribute 'format' > exception > outdf.coalesce(1).write.format("csv").save("/tmp/agg1") > > Here is the full exception > 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id = > 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId = > e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error > py4j.Py4JException: An exception was raised by the Python Proxy. Return > Message: Traceback (most recent call last): > File > "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 2381, in _call_proxy > return_value = getattr(self.pool[obj_id], method)(*params) > File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call > raise e > AttributeError: 'JavaMember' object has no attribute 'format' > at py4j.Protocol.getReturnValue(Protocol.java:473) > at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) > at com.sun.proxy.$Proxy20.call(Unknown Source) > at > org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55) > at > org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55) > at > org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org > $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at org.apache.spark.sql.execution.streaming.StreamExecution.org > $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > >