Re: Pyspark: Issue using sql in foreachBatch sink

2020-08-03 Thread muru
Thanks Jungtaek for your help.

On Fri, Jul 31, 2020 at 6:31 PM Jungtaek Lim 
wrote:

> Python doesn't allow abbreviating () with no param, whereas Scala does.
> Use `write()`, not `write`.
>
> On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:
>
>> In a pyspark SS job, trying to use sql instead of sql functions in
>> foreachBatch sink
>> throws AttributeError: 'JavaMember' object has no attribute 'format'
>> exception.
>> However, the same thing works in Scala API.
>>
>> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
>> exception.
>> Is it a bug or known issue with Pyspark implementation? I noticed that I
>> could perform other operations except the write method.
>>
>> Please, let me know how to fix this issue.
>>
>> See below code examples
>> # Spark Scala method
>> def processData(batchDF: DataFrame, batchId: Long) {
>>batchDF.createOrReplaceTempView("tbl")
>>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
>> from tbl where date='2020-06-20' group by 1")
>>outdf.printSchema()
>>outdf.show
>>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
>> }
>>
>> ## pyspark python method
>> def process_data(bdf, bid):
>>   lspark = bdf._jdf.sparkSession()
>>   bdf.createOrReplaceTempView("tbl")
>>   outdf=lspark.sql("select action, count(*) as count from tbl where
>> date='2020-06-20' group by 1")
>>   outdf.printSchema()
>>   # it works
>>   outdf.show()
>>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
>> exception
>>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>>
>> Here is the full exception
>> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
>> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
>> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>> Message: Traceback (most recent call last):
>>   File
>> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 2381, in _call_proxy
>> return_value = getattr(self.pool[obj_id], method)(*params)
>>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
>> raise e
>> AttributeError: 'JavaMember' object has no attribute 'format'
>> at py4j.Protocol.getReturnValue(Protocol.java:473)
>> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
>> at com.sun.proxy.$Proxy20.call(Unknown Source)
>> at
>> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
>> at
>> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
>> at
>> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroB

Re: Pyspark: Issue using sql in foreachBatch sink

2020-07-31 Thread Jungtaek Lim
Python doesn't allow abbreviating () with no param, whereas Scala does. Use
`write()`, not `write`.

On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:

> In a pyspark SS job, trying to use sql instead of sql functions in
> foreachBatch sink
> throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception.
> However, the same thing works in Scala API.
>
> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
> exception.
> Is it a bug or known issue with Pyspark implementation? I noticed that I
> could perform other operations except the write method.
>
> Please, let me know how to fix this issue.
>
> See below code examples
> # Spark Scala method
> def processData(batchDF: DataFrame, batchId: Long) {
>batchDF.createOrReplaceTempView("tbl")
>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
> from tbl where date='2020-06-20' group by 1")
>outdf.printSchema()
>outdf.show
>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
> }
>
> ## pyspark python method
> def process_data(bdf, bid):
>   lspark = bdf._jdf.sparkSession()
>   bdf.createOrReplaceTempView("tbl")
>   outdf=lspark.sql("select action, count(*) as count from tbl where
> date='2020-06-20' group by 1")
>   outdf.printSchema()
>   # it works
>   outdf.show()
>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception
>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>
> Here is the full exception
> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>   File
> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 2381, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
> raise e
> AttributeError: 'JavaMember' object has no attribute 'format'
> at py4j.Protocol.getReturnValue(Protocol.java:473)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
> at com.sun.proxy.$Proxy20.call(Unknown Source)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)

Pyspark: Issue using sql in foreachBatch sink

2020-07-28 Thread muru
In a pyspark SS job, trying to use sql instead of sql functions in
foreachBatch sink
throws AttributeError: 'JavaMember' object has no attribute 'format'
exception.
However, the same thing works in Scala API.

Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
exception.
Is it a bug or known issue with Pyspark implementation? I noticed that I
could perform other operations except the write method.

Please, let me know how to fix this issue.

See below code examples
# Spark Scala method
def processData(batchDF: DataFrame, batchId: Long) {
   batchDF.createOrReplaceTempView("tbl")
   val outdf=batchDF.sparkSession.sql("select action, count(*) as count
from tbl where date='2020-06-20' group by 1")
   outdf.printSchema()
   outdf.show
   outdf.coalesce(1).write.format("csv").save("/tmp/agg")
}

## pyspark python method
def process_data(bdf, bid):
  lspark = bdf._jdf.sparkSession()
  bdf.createOrReplaceTempView("tbl")
  outdf=lspark.sql("select action, count(*) as count from tbl where
date='2020-06-20' group by 1")
  outdf.printSchema()
  # it works
  outdf.show()
  # throws AttributeError: 'JavaMember' object has no attribute 'format'
exception
  outdf.coalesce(1).write.format("csv").save("/tmp/agg1")

Here is the full exception
20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: Traceback (most recent call last):
  File
"/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 2381, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
  File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
raise e
AttributeError: 'JavaMember' object has no attribute 'format'
at py4j.Protocol.getReturnValue(Protocol.java:473)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy20.call(Unknown Source)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)