Re: Pyspark: Issue using sql in foreachBatch sink

2020-07-31 Thread Jungtaek Lim
Python doesn't allow abbreviating () with no param, whereas Scala does. Use
`write()`, not `write`.

On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:

> In a pyspark SS job, trying to use sql instead of sql functions in
> foreachBatch sink
> throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception.
> However, the same thing works in Scala API.
>
> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
> exception.
> Is it a bug or known issue with Pyspark implementation? I noticed that I
> could perform other operations except the write method.
>
> Please, let me know how to fix this issue.
>
> See below code examples
> # Spark Scala method
> def processData(batchDF: DataFrame, batchId: Long) {
>batchDF.createOrReplaceTempView("tbl")
>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
> from tbl where date='2020-06-20' group by 1")
>outdf.printSchema()
>outdf.show
>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
> }
>
> ## pyspark python method
> def process_data(bdf, bid):
>   lspark = bdf._jdf.sparkSession()
>   bdf.createOrReplaceTempView("tbl")
>   outdf=lspark.sql("select action, count(*) as count from tbl where
> date='2020-06-20' group by 1")
>   outdf.printSchema()
>   # it works
>   outdf.show()
>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
> exception
>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>
> Here is the full exception
> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>   File
> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 2381, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
> raise e
> AttributeError: 'JavaMember' object has no attribute 'format'
> at py4j.Protocol.getReturnValue(Protocol.java:473)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
> at com.sun.proxy.$Proxy20.call(Unknown Source)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)

Re: Tab delimited csv import and empty columns

2020-07-31 Thread Vladimir Ryzhov
Would *df.na.fill("") *do the trick?

On Fri, Jul 31, 2020 at 8:43 AM Sean Owen  wrote:

> Try setting nullValue to anything besides the empty string. Because its
> default is the empty string, empty strings become null by default.
>
> On Fri, Jul 31, 2020 at 3:20 AM Stephen Coy 
> wrote:
>
>> That does not work.
>>
>> This is Spark 3.0 by the way.
>>
>> I have been looking at the Spark unit tests and there does not seem to be
>> any that load a CSV text file and verify that an empty string maps to an
>> empty string which I think is supposed to be the default behaviour because
>> the “nullValue” option defaults to “".
>>
>> Thanks anyway
>>
>> Steve C
>>
>> On 30 Jul 2020, at 10:01 pm, German Schiavon Matteo <
>> gschiavonsp...@gmail.com> wrote:
>>
>> Hey,
>>
>> I understand that your empty values in your CSV are "" , if so, try this
>> option:
>>
>> *.option("emptyValue", "\"\"")*
>>
>> Hope it helps
>>
>> On Thu, 30 Jul 2020 at 08:49, Stephen Coy 
>> wrote:
>>
>>> Hi there,
>>>
>>> I’m trying to import a tab delimited file with:
>>>
>>> Dataset catalogData = sparkSession
>>>   .read()
>>>   .option("sep", "\t")
>>>   .option("header", "true")
>>>   .csv(args[0])
>>>   .cache();
>>>
>>>
>>> This works great, except for the fact that any column that is empty is
>>> given the value null, when I need these values to be literal empty strings.
>>>
>>> Is there any option combination that will achieve this?
>>>
>>> Thanks,
>>>
>>> Steve C
>>>
>>>
>>> 
>>> This email contains confidential information of and is the copyright of
>>> Infomedia. It must not be forwarded, amended or disclosed without consent
>>> of the sender. If you received this message by mistake, please advise the
>>> sender and delete all copies. Security of transmission on the internet
>>> cannot be guaranteed, could be infected, intercepted, or corrupted and you
>>> should ensure you have suitable antivirus protection in place. By sending
>>> us your or any third party personal details, you consent to (or confirm you
>>> have obtained consent from such third parties) to Infomedia’s privacy
>>> policy. http://www.infomedia.com.au/privacy-policy/
>>>
>>
>>


Re: Tab delimited csv import and empty columns

2020-07-31 Thread Sean Owen
Try setting nullValue to anything besides the empty string. Because its
default is the empty string, empty strings become null by default.

On Fri, Jul 31, 2020 at 3:20 AM Stephen Coy 
wrote:

> That does not work.
>
> This is Spark 3.0 by the way.
>
> I have been looking at the Spark unit tests and there does not seem to be
> any that load a CSV text file and verify that an empty string maps to an
> empty string which I think is supposed to be the default behaviour because
> the “nullValue” option defaults to “".
>
> Thanks anyway
>
> Steve C
>
> On 30 Jul 2020, at 10:01 pm, German Schiavon Matteo <
> gschiavonsp...@gmail.com> wrote:
>
> Hey,
>
> I understand that your empty values in your CSV are "" , if so, try this
> option:
>
> *.option("emptyValue", "\"\"")*
>
> Hope it helps
>
> On Thu, 30 Jul 2020 at 08:49, Stephen Coy 
> wrote:
>
>> Hi there,
>>
>> I’m trying to import a tab delimited file with:
>>
>> Dataset catalogData = sparkSession
>>   .read()
>>   .option("sep", "\t")
>>   .option("header", "true")
>>   .csv(args[0])
>>   .cache();
>>
>>
>> This works great, except for the fact that any column that is empty is
>> given the value null, when I need these values to be literal empty strings.
>>
>> Is there any option combination that will achieve this?
>>
>> Thanks,
>>
>> Steve C
>>
>>
>> 
>> This email contains confidential information of and is the copyright of
>> Infomedia. It must not be forwarded, amended or disclosed without consent
>> of the sender. If you received this message by mistake, please advise the
>> sender and delete all copies. Security of transmission on the internet
>> cannot be guaranteed, could be infected, intercepted, or corrupted and you
>> should ensure you have suitable antivirus protection in place. By sending
>> us your or any third party personal details, you consent to (or confirm you
>> have obtained consent from such third parties) to Infomedia’s privacy
>> policy. http://www.infomedia.com.au/privacy-policy/
>>
>
>


Re: Tab delimited csv import and empty columns

2020-07-31 Thread Stephen Coy
That does not work.

This is Spark 3.0 by the way.

I have been looking at the Spark unit tests and there does not seem to be any 
that load a CSV text file and verify that an empty string maps to an empty 
string which I think is supposed to be the default behaviour because the 
“nullValue” option defaults to “".

Thanks anyway

Steve C

On 30 Jul 2020, at 10:01 pm, German Schiavon Matteo 
mailto:gschiavonsp...@gmail.com>> wrote:

Hey,

I understand that your empty values in your CSV are "" , if so, try this option:

.option("emptyValue", "\"\"")

Hope it helps

On Thu, 30 Jul 2020 at 08:49, Stephen Coy 
mailto:s...@infomedia.com.au.invalid>> wrote:
Hi there,

I’m trying to import a tab delimited file with:

Dataset catalogData = sparkSession
  .read()
  .option("sep", "\t")
  .option("header", "true")
  .csv(args[0])
  .cache();

This works great, except for the fact that any column that is empty is given 
the value null, when I need these values to be literal empty strings.

Is there any option combination that will achieve this?

Thanks,

Steve C


[http://downloads.ifmsystems.com/data/marketing/images/signatures/driving-force-newsletter.jpg]

This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/