Re: Does explode lead to more usage of memory

2020-01-18 Thread Jörn Franke
Why not two tables and then you can join them? This would be the standard way. it depends what your full use case is, what volumes / orders you expect on average, how aggregations and filters look like. The example below states that you do a Select all on the table. > Am 19.01.2020 um 01:50 sch

Re: Does explode lead to more usage of memory

2020-01-18 Thread Chris Teoh
I think it does mean more memory usage but consider how big your arrays are. Think about your use case requirements and whether it makes sense to use arrays. Also it may be preferable to explode if the arrays are very large. I'd say exploding arrays will make the data more splittable, having the ar

Does explode lead to more usage of memory

2020-01-18 Thread V0lleyBallJunki3
I am using a dataframe and has structure like this : root |-- orders: array (nullable = true) ||-- element: struct (containsNull = true) |||-- amount: double (nullable = true) |||-- id: string (nullable = true) |-- user: string (nullable = true) |-- language: string (nul

Re: How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread Russell Spitzer
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/InputPartition.java See InputPartition which had a preferred location parameter you should override On Sat, Jan 18, 2020, 1:44 PM kineret M wrote: > Hi, > I would like to support data loca

How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread kineret M
Hi, I would like to support data locality in Spark data source v2. How can I provide Spark the ability to read and process data on the same node? I didn't find any interface that supports 'getPreferredLocations' (or equivalent). Thanks!

How to prevent and track data loss/dropped due to watermark during structure streaming aggregation

2020-01-18 Thread stevech.hu
We have a scenario to group raw records by correlation id every 3 minutes and append groupped result to some HDFS store, below is an example of our query val df= records.readStream.format("SomeDataSource") .selectExpr("current_timestamp() as CurrentTime", "*") .withWatermark("Curr