Spark join: grouping of records having same value for a particular column in same partition

2020-02-26 Thread ARAVIND ARUMUGHAM SETHURATHNAM
Hi,
We have 2 Hive tables which are read in spark and joined using a join key,  
let’s call it user_id.
Then, we write this joined dataset to S3 and register it hive as a 3rd table 
for subsequent tasks to use this joined dataset.
One of the other columns in the joined dataset is called keychain_id.

We want to group all the user records belonging to the same keychain_id in the 
same partition for a reason to avoid shuffles later.
So, can I do a repartition(“keychain_id”) before writing to s3 and registering 
it in Hive , and when I read the same data back from this third table will it 
still have the same partition grouping (all users belonging to the
Same keychain_id in the same partition)? Because trying to avoid doing a 
repartition(“keychain_id”) every time when reading from this 3rd table.
Can you please clarify ?   If there is no guarantee that it will retain the 
same partition grouping while reading, then is there another efficient way this 
can be done other than caching?
Regards,
Aravind


Re: Standard practices for building dashboards for spark processed data

2020-02-26 Thread Breno Arosa
I have been using Athena/Presto to read the parquet files in datalake, if
your are already saving data to s3 I think this is the easiest option.
Then I use Redash or Metabase to build dashboards (they have different
limitations), both are very intuitive to use and easy to setup with docker.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [External Email] Re: Standard practices for building dashboards for spark processed data

2020-02-26 Thread Aniruddha P Tekade
Hi Roland,

Thank you for your reply. That's quite helpful. I think I should try
influxDB then. But I am curious if in case of prometheus writing a custom
exporter be a good choice and solve the purpose efficiently? Grafana is not
something I want to drop.

Best,
Aniruddha
---

ᐧ

On Tue, Feb 25, 2020 at 11:36 PM Roland Johann 
wrote:

> Hi Ani,
>
> Prometheus is not well suited for ingesting explicit timeseries data. Its
> purpose is for technical monitoring. If you want to monitor your spark jobs
> with prometheus you can publish the metrics so prometheus can scrape it.
> What you propably are looking for is a timeseries database that you can
> push metrics to.
>
> Looking for an alternative for grafana should be done only if you find
> grafana is not well suited for your use case regarding visualization.
>
> As said earlier, at a quick glance it sounds that you should look for an
> alternative to prometheus.
>
> For timeseries you can reach out to TimescaleDB, InfluxDB. Other databases
> like normal SQL databases or cassandra lacks up/downsampling capabilities
> that can lead to large query responses and the need for the client to post
> process.
>
> Kind regards,
>
> Aniruddha P Tekade  schrieb am Mi. 26. Feb. 2020
> um 02:23:
>
>> Hello,
>>
>> I am trying to build a data pipeline that uses spark structured streaming
>> with delta project and runs into Kubernetes. Due to this, I get my output
>> files only into parquet format. Since I am asked to use the prometheus and
>> grafana
>> for building the dashboard for this pipeline, I run an another small
>> spark job and convert output into json so that I would be able to insert
>> them into Grafana. Although I can see that this step is redundant,
>> considering the important of delta lake project, I can not write my data
>> directly into json. Therefore I need some help/guidelines/opinions about
>> moving forward from here.
>>
>> I would appreciate if the spark user(s) can provide me some practices to
>> follow with respect to the following questions -
>>
>>1. Since I can not have direct json output from spark structured
>>streams, is there any better way to convert parquet into json? Or should I
>>keep only parquet?
>>2. Will I need to write some custom exporter for prometheus so as to
>>make grafana read those time-series data?
>>3. Is there any better dashboard alternative than Grafana for this
>>requirement?
>>4. Since the pipeline is going to run into Kubernetes, I am trying to
>>avoid InfluxDB as time-series database and moving with prometheus. Is this
>>approach correct?
>>
>> Thanks,
>> Ani
>> ---
>> ᐧ
>>
> --
> Roland Johann
> Software Developer/Data Engineer
>
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
>
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io
> Web: phenetic.io
>
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>


Re: [Spark SQL] Memory problems with packing too many joins into the same WholeStageCodegen

2020-02-26 Thread Liu Genie
Exactly. My problem is a big dataframe joins a lot of small dataframes which I 
convert to Maps and then use udf apply on the big dataframe. (broadcast didn’t 
work in too many joins)

2020年2月26日 09:32,Jianneng Li 
mailto:jianneng...@workday.com>> 写道:

I could be wrong, but I'm guessing that it uses UDF as the build 
side of a hash join. 
So the hash table is inside the UDF, and the UDF is called to perform the join. 
There are limitations to this approach of course, you can't do all joins this 
way.

Best,

Jianneng

From: yeikel valdes mailto:em...@yeikel.com>>
Sent: Tuesday, February 25, 2020 5:48 AM
To: Jianneng Li mailto:jianneng...@workday.com>>
Cc: user@spark.apache.org 
mailto:user@spark.apache.org>>; 
genie_...@outlook.com 
mailto:genie_...@outlook.com>>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the 
same WholeStageCodegen

Can you please explain what you mean with that? How do you use a udf to replace 
a join? Thanks



 On Mon, 24 Feb 2020 22:06:40 -0500 
jianneng...@workday.com wrote 

Thanks Genie. Unfortunately, the joins I'm doing in this case are large, so UDF 
likely won't work.

Jianneng

From: Liu Genie mailto:genie_...@outlook.com>>
Sent: Monday, February 24, 2020 6:39 PM
To: user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: [Spark SQL] Memory problems with packing too many joins into the 
same WholeStageCodegen

I have encountered too many joins problem before. Since the joined dataframe is 
small enough, I convert join to udf operation, which is much faster and didn’t 
generate out of memory problem.

2020年2月25日 10:15,Jianneng Li 
mailto:jianneng...@workday.com>> 写道:

Hello everyone,

WholeStageCodegen generates code that appends 
results
 into a BufferedRowIterator, which keeps the results in an in-memory linked 
list.
 Long story short, this is a problem when multiple joins (i.e. 
BroadcastHashJoin) that can blow up get planned into the same WholeStageCodegen 
- results keep on accumulating in the linked list, and do not get consumed fast 
enough, eventually causing the JVM to run out of memory.

Does anyone else have experience with this problem? Some obvious solutions 
include making BufferedRowIterator spill the linked list, or make it bounded, 
but I'd imagine that this would have been done a long time ago if it were 
necessary.

Thanks,

Jianneng



[SPARK-30957][SQL] Null-safe variant of Dataset.join(Dataset[_], Seq[String])

2020-02-26 Thread Enrico Minack
I have created a jira to track this request: 
https://issues.apache.org/jira/browse/SPARK-30957


Enrico

Am 08.02.20 um 16:56 schrieb Enrico Minack:


Hi Devs,

I am forwarding this from the user mailing list. I agree that the <=> 
version of join(Dataset[_], Seq[String]) would be useful.


Does any PMC consider this useful enough to be added to the Dataset 
API? I'd be happy to create a PR in that case.


Enrico



 Weitergeleitete Nachricht 
Betreff:dataframe null safe joins given a list of columns
Datum:  Thu, 6 Feb 2020 12:45:11 +
Von:Marcelo Valle 
An: user @spark 



I was surprised I couldn't find a way of solving this in spark, as it 
must be a very common problem for users. Then I decided to ask here.


Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null, 
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null, 
"d4")).toDF("a", "b", "d")

df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-+---+---+
|  a|    b|  c|  d|
+---+-+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, 
as it doesn't create duplicate columns by default. However, it uses 
the operator `===` to join, not the null safe one `<=>`.


Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---++---+---++---+
|  a|   b|  c|  a|   b|  d|
+---++---+---++---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---++---+---++---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> 
df2("b")).drop(df2("a")).drop(df2("b")).show()

+---++---+---+
|  a|   b|  c|  d|
+---++---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---++---+---+
```

Which works, but is really verbose, especially when you have many join 
columns.


Is there a better way of solving this without needing a 
utility method? This same problem is something I find in every spark 
project.




This email is confidential [and may be protected by legal privilege]. 
If you are not the intended recipient, please do not copy or disclose 
its content but contact the sender immediately upon receipt.


KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, 
United Kingdom