Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

2021-04-07 Thread Tomas Bartalos
when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get
an incorrect result of 0 rows.

val rightDF = spark.read.format("parquet").load("table-a")
val leftDF =  spark.read.format("parquet").load("table-b")
  //needed to activate dynamic pruning subquery
  .where('part_ts === 20210304000L)

// leftDF has 7 Mil rows ~ 120 MB
val join = broadcast(leftDF).join(rightDF,
  $"match_part_id" === $"part_id" && $"match_id" === $"id"
)
join.count

res1: Long = 0

I think it's connected with Dynamic Partition Pruning of the rightDF, which
is happening according to the plan:

PartitionFilters: [isnotnull(part_id#477L),
dynamicpruningexpression(part_id#477L IN dynamicpruning#534)]

= Subqueries =

Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L
IN dynamicpruning#534
ReusedExchange (11)


(11) ReusedExchange [Reuses operator id: 5]
Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L]

*Removing the broadcast hint OR shrinking the broadcasted table corrects
the result*:

val rightDF = spark.read.format("parquet").load("table-a")
val leftDF =  spark.read.format("parquet").load("table-b")
  //needed to activate dynamic pruning subquery
  .where('part_ts === 20210304000L)
 // shrinks the broadcasted table to 18K rows
 .where('match_id === 33358792)

// leftDF has 18K rows
val join = broadcast(leftDF).join(rightDF,
  $"match_part_id" === $"part_id" && $"match_id" === $"id"
)
join.count

res2: Long = 379701

I would expect the broadcast to fail, but would never expect to get
incorrect results without an exception. What do you think ?


BR,

Tomas


Re: Parquet read performance for different schemas

2019-09-20 Thread Tomas Bartalos
I forgot to mention important part that I'm issuing same query to both
parquets - selecting only one column:

df.select(sum('amount))

BR,
Tomas

št 19. 9. 2019 o 18:10 Tomas Bartalos  napísal(a):

> Hello,
>
> I have 2 parquets (each containing 1 file):
>
>- parquet-wide - schema has 25 top level cols + 1 array
>- parquet-narrow - schema has 3 top level cols
>
> Both files have same data for given columns.
> When I read from parquet-wide spark reports* read 52.6 KB*, from
> parquet-narrow *only 2.6 KB*.
> For bigger dataset the difference is *413 MB vs 961 MB*. Needless to say
> reading narrow parquet is much faster.
>
> Since schema pruning is applied I *expected to get similar results* for
> both scenarios (timing and amount of data read).
> What do you think is the reason for such a big difference, is there any
> tuning I can do ?
>
> Thank you,
> Tomas
>


Parquet read performance for different schemas

2019-09-19 Thread Tomas Bartalos
Hello,

I have 2 parquets (each containing 1 file):

   - parquet-wide - schema has 25 top level cols + 1 array
   - parquet-narrow - schema has 3 top level cols

Both files have same data for given columns.
When I read from parquet-wide spark reports* read 52.6 KB*, from
parquet-narrow *only 2.6 KB*.
For bigger dataset the difference is *413 MB vs 961 MB*. Needless to say
reading narrow parquet is much faster.

Since schema pruning is applied I *expected to get similar results* for
both scenarios (timing and amount of data read).
What do you think is the reason for such a big difference, is there any
tuning I can do ?

Thank you,
Tomas


Partition pruning by IDs from another table

2019-07-12 Thread Tomas Bartalos
Hello,
I have 2 parquet tables:
stored - table of 10 M records
data - table of 100K records

*This is fast:*
val dataW = data.where("registration_ts in (20190516204l,
20190515143l,20190510125l, 20190503151l)")
dataW.count
res44: Long = 42
//takes 3 seconds
stored.join(broadcast(dataW), Seq("registration_ts"), "leftsemi").collect

*Similar but its slow:*
val dataW = data.limit(10).select("registration_ts").distinct
dataW.count
res45: Long = 1
//takes 2 minutes
stored.join(broadcast(dataW), Seq("registration_ts"), "leftsemi").collect
[Stage 181:>  (0 + 1) /
373]

The reason is that the first query propagates PartitionFilters up to joined
"stored" table:
... PartitionFilters: [registration_ts#1635L IN
(20190516204,20190515143,20190510125,20190503151)
And the second one is not:
PartitionFilters: []

For low number of IDs its more effective to collect them to driver and
issue a 2-nd query with partition filter, but there have to be a better
way...
How can I achieve effective partition pruning when using IDs from other
table ?

Following SQL have same query plan and same behavior:
spark.sql("select * from stored where exists (select 1 from dataW where
dataW.registration_ts = stored.registration_ts)")

Thank you,
Tomas


Re: Access to live data of cached dataFrame

2019-05-19 Thread Tomas Bartalos
I'm trying to re-read however I'm getting cached data (which is a bit
confusing). For re-read I'm issuing:
spark.read.format("delta").load("/data").groupBy(col("event_hour")).count

The cache seems to be global influencing also new dataframes.

So the question is how should I re-read without loosing the cached data
(without using unpersist) ?

As I mentioned with sql its possible - I can create a cached view, so wen I
access the original table I get live data, when I access the view I get
cached data.

BR,
Tomas

On Fri, 17 May 2019, 8:57 pm Sean Owen,  wrote:

> A cached DataFrame isn't supposed to change, by definition.
> You can re-read each time or consider setting up a streaming source on
> the table which provides a result that updates as new data comes in.
>
> On Fri, May 17, 2019 at 1:44 PM Tomas Bartalos 
> wrote:
> >
> > Hello,
> >
> > I have a cached dataframe:
> >
> >
> spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.cache
> >
> > I would like to access the "live" data for this data frame without
> deleting the cache (using unpersist()). Whatever I do I always get the
> cached data on subsequent queries. Even adding new column to the query
> doesn't help:
> >
> >
> spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.withColumn("dummy",
> lit("dummy"))
> >
> >
> > I'm able to workaround this using cached sql view, but I couldn't find a
> pure dataFrame solution.
> >
> > Thank you,
> > Tomas
>


Access to live data of cached dataFrame

2019-05-17 Thread Tomas Bartalos
Hello,

I have a cached dataframe:

spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.cache

I would like to access the "live" data for this data frame without deleting
the cache (using unpersist()). Whatever I do I always get the cached data
on subsequent queries. Even adding new column to the query doesn't help:

spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.withColumn("dummy",
lit("dummy"))


I'm able to workaround this using cached sql view, but I couldn't find a
pure dataFrame solution.

Thank you,
Tomas


Howto force spark to honor parquet partitioning

2019-05-03 Thread Tomas Bartalos
Hello,

I have partitioned parquet files based on "event_hour" column.
After reading parquet files to spark:
spark.read.format("parquet").load("...")
Files from the same parquet partition are scattered in many spark
partitions.

Example of mapping spark partition -> parquet partition:

Spark partition 1 -> 2019050101, 2019050102, 2019050103
Spark partition 2 -> 2019050101, 2019050103, 2019050104
...
Spark partition 20 -> 2019050101, ...
Spark partition 21 -> 2019050101, ...

As you can see parquet partition 2019050101 is present in Spark partition
1, 2, 20, 21.
As a result when I write out the dataFrame:
df.write.partitionBy("event_hour").format("parquet").save("...")

 There are many files created in one parquet partition (In case of our
example its 4 files, but in reality its much more)
To speed up queries, my goal is to write 1 file per parquet partition (1
file per hour).

So far my only solution is to use repartition:
df.repartition(col("event_hour"))

But there is a lot of overhead with unnecessary shuffle. I'd like to force
spark to "pickup" the parquet partitioning.

In my investigation I've found
org.apache.spark.sql.execution.FileSourceScanExec#createNonBucketedReadRDD

where the initial partitioning is happening based on file sizes. There is
an explicit ordering which causes parquet partition shuffle.

thank you for your help,
Tomas


thrift server in "streaming mode"

2019-02-20 Thread Tomas Bartalos
Hello,

For one of our legacy workloads we use spark thriftserver to retrieve data
from Kafka. The pipeline is:
Oracle -- odbc --> Spark Thrift --> Kafka

Spark is doing some transformation like: avro deserialize, array explode,
etc, but no aggregation.
The main issue we face is that thrift sends data back to Oracle after all
records are processed. This approach results in high memory pressure when
requesting big amounts (with possible OOM).
Is there a possibility to configure thrift to work in "streaming" fashion,
processing and sending intermediate chunks ?

I've tried Hive 4.0.0-SNAPSHOT and it works in streaming fashion, receiving
and transmitting in the same time, but yeah, its a single server with no
clustering available.

BR,
Tomas


Re: Structured streaming from Kafka by timestamp

2019-02-01 Thread Tomas Bartalos
Hello,

sorry for my late answer.
You're right, what I'm doing is a one time query, not a structured
streaming. Probably it will be best to describe my use case:
I'd like to expose live data (via jdbc/odbc) residing in Kafka with the
power of spark's distributed sql engine. As jdbc server I use spark thrift
server.
Since timestamp pushdown is not possible :-(, this is a very cumbersome
task.
Let's say I want to inspect last 5 minutes of kafka. First I have to find
out offsetFrom per each partition that corresponds to now() - 5 minutes.
Then I can register a kafka table:

CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers
'server1,server2,...',

subscribe 'my_topic',

startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" :
49456213, "3" : 48400521}}');

Then I can issue queries against this table (Data in Kafka is stored in
Avro format but I've created custom genericUDF to deserialize the data).

select event.id as id, explode(event.picks) as picks from (

select from_avro(value) as event from ticket_kafka_x where timestamp >
from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")

) limit 100;


Whats even more irritating after few minutes I have to re-create this table
to reflect the last 5 minutes interval, otherwise the query performance
would suffer from increasing data to filter.

Colleague of mine was able to make direct queries with timestamp pushdown
in latest Hive.
How difficult is it to implement this feature in spark, could you lead me
to code where I could have a look ?

Thank you,


pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu 
napísal(a):

> Hey Tomas,
>
> From your description, you just ran a batch query rather than a Structured
> Streaming query. The Kafka data source doesn't support filter push down
> right now. But that's definitely doable. One workaround here is setting
> proper  "startingOffsets" and "endingOffsets" options when loading from
> Kafka.
>
> Best Regards,
> Ryan
>
>
> On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi 
> wrote:
>
>> Hi Tomas,
>>
>> As a general note don't fully understand your use-case. You've mentioned
>> structured streaming but your query is more like a one-time SQL statement.
>> Kafka doesn't support predicates how it's integrated with spark. What can
>> be done from spark perspective is to look for an offset for a specific
>> lowest timestamp and start the reading from there.
>>
>> BR,
>> G
>>
>>
>> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to read Kafka via spark structured streaming. I'm trying to
>>> read data within specific time range:
>>>
>>> select count(*) from kafka_table where timestamp > cast('2019-01-23
>>> 1:00' as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP)
>>> ;
>>>
>>>
>>> The problem is that timestamp query is not pushed-down to Kafka, so
>>> Spark tries to read the whole topic from beginning.
>>>
>>>
>>> explain query:
>>>
>>> 
>>>
>>>  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 >
>>> 15351480)) && (timestamp#57 < 15352344))
>>>
>>>
>>> Scan
>>> KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production],
>>> start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)
>>> [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
>>> *PushedFilters: []*, ReadSchema:
>>> struct>>
>>>
>>> Obviously the query takes forever to complete. Is there a solution to
>>> this ?
>>>
>>> I'm using kafka and kafka-client version 1.1.1
>>>
>>>
>>> BR,
>>>
>>> Tomas
>>>
>>


Structured streaming from Kafka by timestamp

2019-01-24 Thread Tomas Bartalos
Hello,

I'm trying to read Kafka via spark structured streaming. I'm trying to read
data within specific time range:

select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' as
TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


The problem is that timestamp query is not pushed-down to Kafka, so Spark
tries to read the whole topic from beginning.


explain query:



 +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 >
15351480)) && (timestamp#57 < 15352344))


Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production],
start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit)
[key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
*PushedFilters: []*, ReadSchema:
struct

Reading compacted Kafka topic is slow

2019-01-24 Thread Tomas Bartalos
Hello Spark folks,

I'm reading compacted Kafka topic with spark 2.4, using direct stream -
KafkaUtils.createDirectStream(...). I have configured necessary options for
compacted stream, so its processed with CompactedKafkaRDDIterator.
It works well, however in case of many gaps in the topic, the processing is
very slow and 90% of time the executors are idle.

I had a look to the source are here are my findings:
Spark first computes number of records to stream from Kafka (processing
rate * batch window size). # of records are translated to Kafka's
(offset_from, offset_to) and eventually the Iterator reads records within
the offset boundaries.
This works fine until there are many gaps in the topic, which reduces the
real number of processed records.
Let's say we wanted to read 100k records in 60 sec window. With gaps it
gets to 10k (because 90k are just compacted gaps) in 60 sec.
As a result executor is working only 6 sec and 54 sec doing nothing.
I'd like to utilize the executor as much as possible.

A great feature would be to read 100k real records (skip the gaps) no
matter what are the offsets.

I've tried to make some improvement with backpressure and my custom
RateEstimator (decorating PidRateEstimator and boosting the rate per
second). And was even able to fully utilize the executors, but my approach
have a big problem when compacted part of the topic meets non compacted
part. The executor just tries to read a too big chunk of Kafka and the
whole processing dies.

BR,
Tomas


Re: How to get all input tables of a SPARK SQL 'select' statement

2019-01-23 Thread Tomas Bartalos
This might help:

show tables;

st 23. 1. 2019 o 10:43  napísal(a):

> Hi, All,
>
> We need to get all input tables of several SPARK SQL 'select' statements.
>
> We can get those information of Hive SQL statements by using 'explain
> dependency select'.
> But I can't find the equivalent command for SPARK SQL.
>
> Does anyone know how to get this information of a SPARK SQL 'select'
> statement?
>
> Thanks
>
> Boying
>
>
>
> --
>
>
>
> 本邮件内容包含保密信息。如阁下并非拟发送的收件人,请您不要阅读、保存、对外披露或复制本邮件的任何内容,或者打开本邮件的任何附件。请即回复邮件告知发件人,并立刻将该邮件及其附件从您的电脑系统中全部删除,不胜感激。
>
>
>
> This email message may contain confidential and/or privileged information.
> If you are not the intended recipient, please do not read, save, forward,
> disclose or copy the contents of this email or open any file attached to
> this email. We will be grateful if you could advise the sender immediately
> by replying this email, and delete this email and any attachment or links
> to this email completely and immediately from your computer system.
>
>
> --
>
>


cache table vs. parquet table performance

2019-01-15 Thread Tomas Bartalos
Hello,

I'm using spark-thrift server and I'm searching for best performing
solution to query hot set of data. I'm processing records with nested
structure, containing subtypes and arrays. 1 record takes up several KB.

I tried to make some improvement with cache table:

cache table event_jan_01 as select * from events where day_registered =
20190102;


If I understood correctly, the data should be stored in *in-memory columnar*
format with storage level MEMORY_AND_DISK. So data which doesn't fit to
memory will be spille to disk (I assume also in columnar format (?))
I cached 1 day of data (1 M records) and according to spark UI storage tab
none of the data was cached to memory and everything was spilled to disk.
The size of the data was *5.7 GB.*
Typical queries took ~ 20 sec.

Then I tried to store the data to parquet format:

CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" as


select * from event_jan_01;


The whole parquet took up only *178MB.*
And typical queries took 5-10 sec.

Is it possible to tune spark to spill the cached data in parquet format ?
Why the whole cached table was spilled to disk and nothing stayed in memory
?

Spark version: 2.4.0

Best regards,
Tomas