[Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-02 Thread Pranav Agrawal
can't get around this error when performing union of two datasets
(ds1.union(ds2)) having complex data type (struct, list),


*18/06/02 15:12:00 INFO ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the compatible column types.
array>
<>
array>
at the 21th column of the second table;;*
As far as I can tell, they are the same. What am I doing wrong? Any help /
workaround appreciated!

spark version: 2.2.1

Thanks,
Pranav


[Spark SQL] error in performing dataset union with complex data type (struct, list)

2018-06-02 Thread Pranav Agrawal
can't get around this error when performing union of two datasets having
complex data type (struct, list),


*18/06/02 15:12:00 INFO ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the compatible column types.
array>
<>
array>
at the 21th column of the second table;;*
As far as I can tell, they are the same. What am I doing wrong? Any help /
workaround appreciated!

spark version: 2.2.1

Thanks,
Pranav


Re: Append In-Place to S3

2018-06-02 Thread Aakash Basu
As Jay suggested correctly, if you're joining then overwrite otherwise only
append as it removes dups.

I think, in this scenario, just change it to write.mode('overwrite')
because you're already reading the old data and your job would be done.


On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:

> Hi Jay,
>
> Thanks for your response. Are you saying to append the new data and then
> remove the duplicates to the whole data set afterwards overwriting the
> existing data set with new data set with appended values? I will give that
> a try.
>
> Cheers,
> Ben
>
> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>
>> Benjamin,
>>
>> The append will append the "new" data to the existing data with removing
>> the duplicates. You would need to overwrite the file everytime if you need
>> unique values.
>>
>> Thanks,
>> Jayadeep
>>
>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>
>>> I have a situation where I trying to add only new rows to an existing
>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>> for each hour of the day. First, I create a DF from the existing data, then
>>> I use a query to create another DF with the data that is new. Here is the
>>> code snippet.
>>>
>>> df = spark.read.parquet(existing_data_path)
>>> df.createOrReplaceTempView(‘existing_data’)
>>> new_df = spark.read.parquet(new_data_path)
>>> new_df.createOrReplaceTempView(’new_data’)
>>> append_df = spark.sql(
>>> """
>>> WITH ids AS (
>>> SELECT DISTINCT
>>> source,
>>> source_id,
>>> target,
>>> target_id
>>> FROM new_data i
>>> LEFT ANTI JOIN existing_data im
>>> ON i.source = im.source
>>> AND i.source_id = im.source_id
>>> AND i.target = im.target
>>> AND i.target = im.target_id
>>> """
>>> )
>>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>>> compression='gzip’)
>>>
>>>
>>> I thought this would append new rows and keep the data unique, but I am
>>> see many duplicates. Can someone help me with this and tell me what I am
>>> doing wrong?
>>>
>>> Thanks,
>>> Ben
>>>
>>


Re: Append In-Place to S3

2018-06-02 Thread Benjamin Kim
Hi Jay,

Thanks for your response. Are you saying to append the new data and then
remove the duplicates to the whole data set afterwards overwriting the
existing data set with new data set with appended values? I will give that
a try.

Cheers,
Ben

On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:

> Benjamin,
>
> The append will append the "new" data to the existing data with removing
> the duplicates. You would need to overwrite the file everytime if you need
> unique values.
>
> Thanks,
> Jayadeep
>
> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>
>> I have a situation where I trying to add only new rows to an existing
>> data set that lives in S3 as gzipped parquet files, looping and appending
>> for each hour of the day. First, I create a DF from the existing data, then
>> I use a query to create another DF with the data that is new. Here is the
>> code snippet.
>>
>> df = spark.read.parquet(existing_data_path)
>> df.createOrReplaceTempView(‘existing_data’)
>> new_df = spark.read.parquet(new_data_path)
>> new_df.createOrReplaceTempView(’new_data’)
>> append_df = spark.sql(
>> """
>> WITH ids AS (
>> SELECT DISTINCT
>> source,
>> source_id,
>> target,
>> target_id
>> FROM new_data i
>> LEFT ANTI JOIN existing_data im
>> ON i.source = im.source
>> AND i.source_id = im.source_id
>> AND i.target = im.target
>> AND i.target = im.target_id
>> """
>> )
>> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
>> compression='gzip’)
>>
>>
>> I thought this would append new rows and keep the data unique, but I am
>> see many duplicates. Can someone help me with this and tell me what I am
>> doing wrong?
>>
>> Thanks,
>> Ben
>>
>


Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-06-02 Thread Timur Shenkao
Did you use RDDs or DataFrames?
What is the Spark version?

On Mon, May 28, 2018 at 10:32 PM, Saulo Sobreiro 
wrote:

> Hi,
> I run a few more tests and found that even with a lot more operations on
> the scala side, python is outperformed...
>
> Dataset Stream duration: ~3 minutes (csv formatted data messages read from
> Kafka)
> Scala process/store time: ~3 minutes (map with split + metrics
> calculations + store raw + strore metrics )
> Python process/store time: ~7 minutes (map with split + store raw )
>
> This is the difference between being usable in production or not. I get
> that python is likely to be slower because of that Python - Java object
> transformations, but I was not expecting such a huge difference.
>
> This results are very interesting as I was comparing to the time that an
> "equivalent" application in storm takes to process the exact same stream
> (~3 minutes as well) for the same results and spark was clearly losing the
> race.
>
> Thank you all for your feedback :)
>
> Regards,
> Saulo
>
> On 21/05/2018 14:09:40, Russell Spitzer  wrote:
> The answer is most likely that when you use Cross Java - Python code you
> incur a penalty for every objects that you transform from a Java object
> into a Python object (and then back again to a Python object) when data is
> being passed in and out of your functions. A way around this would probably
> be to have used the Dataframe API if possible, which would have compiled
> the interactions in Java and skipped python-java serialization. Using Scala
> from the start thought is a great idea. I would also probably remove the
> cache from your stream since that probably is only hurting (adding an
> additional serialization which is only used once.)
>
> On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
> wrote:
>
>> The main language they developed spark with is scala, so all the new
>> features go first to scala, java and finally python. I'm not surprised by
>> the results, we've seen it on Stratio since the first versions of spark. At
>> the beginning of development, some of our engineers make the prototype with
>> python, but when it comes down to it, if it goes into production, it has to
>> be rewritten in scala or java, usually scala.
>>
>>
>>
>> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
>> saulo.sobre...@outlook.pt>) escribió:
>>
>>> Hi Javier,
>>>
>>> Thank you a lot for the feedback.
>>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to
>>> run this use case in yarn-client mode. I managed to run this in standalone
>>> (local master) mode only.
>>>
>>> I do not have the hardware available to run this setup in a cluster yet,
>>> so I decided to dig a little bit more in the implementation to see what
>>> could I improve. I just finished evaluating some results.
>>> If you find something wrong or odd please let me know.
>>>
>>> Following your suggestion to use "saveToCassandra" directly I decided to
>>> try Scala. Everything was implemented in the most similar way possible and
>>> I got surprised by the results. The scala implementation is much faster.
>>>
>>> My current implementation is slightly different from the Python code
>>> shared some emails ago but to compare the languages influence in the most
>>> comparable way I used the following snippets:
>>>
>>> # Scala implementation --
>>>
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>  ssc,
>>>  LocationStrategies.PreferConsistent,
>>>  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> kstream
>>>.map( x => parse(x.value) )
>>>.saveToCassandra("hdpkns", "batch_measurement")
>>>
>>> # Python implementation 
>>> # Adapted from the previously shared code. However instead of
>>> calculating the metrics, it is just parsing the messages.
>>>
>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>> {"metadata.broker.list": brokers})
>>
>
>> kafkaStream \
>> .transform(parse) \
>> .foreachRDD(casssave)
>>
>>
>>> For the same streaming input the scala app took an average of ~1.5
>>> seconds to handle each event. For the python implementation, the app took
>>> an average of ~80 seconds to handle each event (and after a lot of pickle
>>> concurrency access issues).
>>>
>>> Note that I considered the time as the difference between the event
>>> generation (before being published to Kafka) and the moment just before the
>>> saveToCassandra.
>>>
>>> The problem in the python implementation seems to be due to the delay
>>> introduced by the foreachRDD(casssave) call, which only runs 
>>> rdd.saveToCassandra(
>>> "test_hdpkns", "measurement" ).
>>>
>>>
>>> Honestly I was not expecting such a difference between these 2 codes...
>>> Can you understand why is this happening ?
>>>
>>>
>>>
>>> Again, Thank you very much for your help,
>>>
>>> Best Regards
>>>
>>>
>>> Sharing my current 

Re: Append In-Place to S3

2018-06-02 Thread vincent gromakowski
Structured streaming can provide idempotent and exactly once writings in
parquet but I don't know how it does under the hood.
Without this you need to load all your dataset, then dedup, then write back
the entire dataset. This overhead can be minimized with partitionning
output files.

Le ven. 1 juin 2018, 18:01, Benjamin Kim  a écrit :

> I have a situation where I trying to add only new rows to an existing data
> set that lives in S3 as gzipped parquet files, looping and appending for
> each hour of the day. First, I create a DF from the existing data, then I
> use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I am
> see many duplicates. Can someone help me with this and tell me what I am
> doing wrong?
>
> Thanks,
> Ben
>


Re: Append In-Place to S3

2018-06-02 Thread Jay
Benjamin,

The append will append the "new" data to the existing data with removing
the duplicates. You would need to overwrite the file everytime if you need
unique values.

Thanks,
Jayadeep

On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:

> I have a situation where I trying to add only new rows to an existing data
> set that lives in S3 as gzipped parquet files, looping and appending for
> each hour of the day. First, I create a DF from the existing data, then I
> use a query to create another DF with the data that is new. Here is the
> code snippet.
>
> df = spark.read.parquet(existing_data_path)
> df.createOrReplaceTempView(‘existing_data’)
> new_df = spark.read.parquet(new_data_path)
> new_df.createOrReplaceTempView(’new_data’)
> append_df = spark.sql(
> """
> WITH ids AS (
> SELECT DISTINCT
> source,
> source_id,
> target,
> target_id
> FROM new_data i
> LEFT ANTI JOIN existing_data im
> ON i.source = im.source
> AND i.source_id = im.source_id
> AND i.target = im.target
> AND i.target = im.target_id
> """
> )
> append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
> compression='gzip’)
>
>
> I thought this would append new rows and keep the data unique, but I am
> see many duplicates. Can someone help me with this and tell me what I am
> doing wrong?
>
> Thanks,
> Ben
>