Re: Kafka Topic to Parquet HDFS with Structured Streaming

2020-11-19 Thread AlbertoMarq
Hi Chetan
I'm having the exact same issue with spark structured streaming and kafka
trying to write to HDFS.
Can you please tell me how did you fixed it?
I'm ussing spark 3.0.1 and hadoop 3.3.0

Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.0.1 new Proleptic Gregorian calendar

2020-11-19 Thread Maxim Gekk
Hello Saurabh,

>  What config options should we set,
> - if we are always going to read old data written from Spark2.4 using
Spark 3.0

You should set *spark.sql.legacy.parquet.datetimeRebaseModeInRead* to
*LEGACY *when you read old data*.*

You see this exception because Spark 3.0 cannot determine who wrote the
parquet files and which calendar was used while saving the files. Starting
from the version 2.4.6, Spark saves meta-data to parquet files, and Spark
3.0 can infer the mode automatically.

Maxim Gekk

Software Engineer

Databricks, Inc.


On Thu, Nov 19, 2020 at 8:10 PM Saurabh Gulati
 wrote:

> Hello,
> First of all, Thanks to you guys for maintaining and improving Spark.
>
> We just updated to Spark 3.0.1 and are facing some issues with the new
> Proleptic Gregorian calendar.
>
> We have data from different sources in our platform and we saw there were
> some * date/timestamp* columns that go back to years before 1500.
>
> According to this
> 
> post, data written with spark 2.4 and read with 3.0 should result in some
> difference in *dates/timestamps* but we are not able to replicate this
> issue. We only encounter an exception that suggests us to set
> *spark.sql.legacy.parquet.datetimeRebaseModeInRead/Write* config options
> to make it work.
>
> So, our main concern is:
>
>- How can we test/replicate this behavior? Since it's not very clear
>to us/nor we see any docs for this change, we can't decide with certainty
>which parameters to set and why.
>- What config options should we set,
>   -  if we are always going to read old data written from Spark2.4
>   using Spark 3.0
>   - will always be writing newer data with Spark3.0.
>
> We couldn't make a deterministic/informed choice so it's a better idea to
> ask the community what scenarios will be impacted and what will still work
> fine.
>
> Thanks
> Saurabh
>
>
>


Spark 3.0.1 new Proleptic Gregorian calendar

2020-11-19 Thread Saurabh Gulati
Hello,
First of all, Thanks to you guys for maintaining and improving Spark.

We just updated to Spark 3.0.1 and are facing some issues with the new 
Proleptic Gregorian calendar.

We have data from different sources in our platform and we saw there were some 
date/timestamp columns that go back to years before 1500.

According to 
this
 post, data written with spark 2.4 and read with 3.0 should result in some 
difference in dates/timestamps but we are not able to replicate this issue. We 
only encounter an exception that suggests us to set 
spark.sql.legacy.parquet.datetimeRebaseModeInRead/Write config options to make 
it work.

So, our main concern is:

  *   How can we test/replicate this behavior? Since it's not very clear to 
us/nor we see any docs for this change, we can't decide with certainty which 
parameters to set and why.
  *   What config options should we set,
 *if we are always going to read old data written from Spark2.4 using 
Spark 3.0
 *   will always be writing newer data with Spark3.0.

We couldn't make a deterministic/informed choice so it's a better idea to ask 
the community what scenarios will be impacted and what will still work fine.

Thanks
Saurabh




Re: Cannot perform operation after producer has been closed

2020-11-19 Thread Eric Beabes
THANK YOU SO MUCH! Will try it out & revert.

On Thu, Nov 19, 2020 at 8:18 AM Gabor Somogyi 
wrote:

> "spark.kafka.producer.cache.timeout" is available since 2.2.1 which can be
> increased as a temporary workaround.
> This is not super elegant but works which gives enough time to migrate to
> Spark 3.
>
>
> On Wed, Nov 18, 2020 at 11:12 PM Eric Beabes 
> wrote:
>
>> I must say.. *Spark has let me down in this case*. I am surprised an
>> important issue like this hasn't been fixed in Spark 2.4.
>>
>> I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at work
>> & now because Spark 2.4 can't handle this *I've been asked to rewrite
>> the code in Flink*.
>>
>> Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't
>> have a Spark 3.0 parcel So we can't upgrade to 3.0.
>>
>> So sad. Let me ask one more time. *Is there no way to fix this in Spark
>> 2.4?*
>>
>>
>> On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes 
>> wrote:
>>
>>> BTW, we are seeing this message as well: 
>>> *"org.apache.kafka.common.KafkaException:
>>> Producer** closed while send in progress"*. I am assuming this happens
>>> because of the previous issue.."producer has been closed", right? Or are
>>> they unrelated? Please advise. Thanks.
>>>
>>> On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes 
>>> wrote:
>>>
 Thanks for the reply. We are on Spark 2.4. Is there no way to get this
 fixed in Spark 2.4?

 On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim <
 kabhwan.opensou...@gmail.com> wrote:

> Which Spark version do you use? There's a known issue on Kafka
> producer pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to
> check whether your case is bound to the known issue or not.
>
> https://issues.apache.org/jira/browse/SPARK-21869
>
>
> On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
> wrote:
>
>> I know this is related to Kafka but it happens during the Spark
>> Structured Streaming job that's why I am asking on this mailing list.
>>
>> How would you debug this or get around this in Spark Structured
>> Streaming? Any tips would be appreciated. Thanks.
>>
>>
>> java.lang.IllegalStateException: Cannot perform operation after
>> producer has been closed at
>> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
>> at
>> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
>> at
>> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>>
>


Re: Cannot perform operation after producer has been closed

2020-11-19 Thread Gabor Somogyi
"spark.kafka.producer.cache.timeout" is available since 2.2.1 which can be
increased as a temporary workaround.
This is not super elegant but works which gives enough time to migrate to
Spark 3.


On Wed, Nov 18, 2020 at 11:12 PM Eric Beabes 
wrote:

> I must say.. *Spark has let me down in this case*. I am surprised an
> important issue like this hasn't been fixed in Spark 2.4.
>
> I am fighting a battle of 'Spark Structured Streaming' Vs 'Flink' at work
> & now because Spark 2.4 can't handle this *I've been asked to rewrite the
> code in Flink*.
>
> Moving to Spark 3.0 is not an easy option 'cause Cloudera 6.2 doesn't have
> a Spark 3.0 parcel So we can't upgrade to 3.0.
>
> So sad. Let me ask one more time. *Is there no way to fix this in Spark
> 2.4?*
>
>
> On Tue, Nov 10, 2020 at 11:33 AM Eric Beabes 
> wrote:
>
>> BTW, we are seeing this message as well: 
>> *"org.apache.kafka.common.KafkaException:
>> Producer** closed while send in progress"*. I am assuming this happens
>> because of the previous issue.."producer has been closed", right? Or are
>> they unrelated? Please advise. Thanks.
>>
>> On Tue, Nov 10, 2020 at 11:17 AM Eric Beabes 
>> wrote:
>>
>>> Thanks for the reply. We are on Spark 2.4. Is there no way to get this
>>> fixed in Spark 2.4?
>>>
>>> On Mon, Nov 2, 2020 at 8:32 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Which Spark version do you use? There's a known issue on Kafka producer
 pool in Spark 2.x which was fixed in Spark 3.0, so you'd like to check
 whether your case is bound to the known issue or not.

 https://issues.apache.org/jira/browse/SPARK-21869


 On Tue, Nov 3, 2020 at 1:53 AM Eric Beabes 
 wrote:

> I know this is related to Kafka but it happens during the Spark
> Structured Streaming job that's why I am asking on this mailing list.
>
> How would you debug this or get around this in Spark Structured
> Streaming? Any tips would be appreciated. Thanks.
>
>
> java.lang.IllegalStateException: Cannot perform operation after
> producer has been closed at
> org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:853)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:862)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846)
> at
> org.apache.spark.sql.kafka010.KafkaRowWriter.sendRow(KafkaWriteTask.scala:92)
> at
> org.apache.spark.sql.kafka010.KafkaStreamDataWriter.write(KafkaStreamWriter.scala:95)
>



Re: Need Unit test complete reference for Pyspark

2020-11-19 Thread Sofia’s World
Hey
 they are good libraries..to get you started. Have used both of them..
unfortunately -as far as i saw when i started to use them  - only few
people maintains them.
But you can get pointers out of them for writing tests. the code below can
get you started
What you'll need is

- a method to create dataframe on the fly, perhaps from  a string.  you can
have a look at pandas, it will have methods for it
- a method to test dataframe equality. you can use  df1.subtract(df2)

I am assuming you are into dataframes - rather than RDDs, for which the two
packages you mention  should have everything you need

hht
 marco


import logging
from pyspark.sql import SparkSession
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
import pyspark
from pyspark.sql import SparkSession
import pytest
import shutil

@pytest.fixture
def spark_session():
return SparkSession.builder \
.master('local[1]') \
.appName('SparkByExamples.com') \
.getOrCreate()


def test_create_table(spark_session):
df = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])
print(df.show())

df2 = spark_session.createDataFrame([['one',
'two']]).toDF(*['first', 'second'])

assert df.subtract(df2).count() == 0




On Thu, Nov 19, 2020 at 6:38 AM Sachit Murarka 
wrote:

> Hi Users,
>
> I have to write Unit Test cases for PySpark.
> I think pytest-spark and "spark testing base" are good test libraries.
>
> Can anyone please provide full reference for writing the test cases in
> Python using these?
>
> Kind Regards,
> Sachit Murarka
>