Cool, that's a good find. Re-stating what you're seeing: the distribution of your HBase table (region splits) doesn't match an even distribution of the data in the HBase table. Some regions have more data than other regions.

Typically, applications reading from HBase will launch workers based on the Region split points, or modulo some maximum number of "work items" (tasks, in your case, I'd guess).

I'd take a look at the amount of data in HDFS for each region in your table, and see if you can find any skew. If there are large region(s), you can try to split them. Or, you can change the split threshold from the default of 10G (iirc) to a smaller number and let the system do it for you.

On 3/15/18 5:49 AM, Stepan Migunov wrote:
The table is about 300GB in hbase.
I've done some more research and now my test is very simple - I'm tryng to
calculate count of records of the table. No "distincts" and etc., just
phoenixTableAsDataFrame(...).count().

And now I see the issue - Spark creates about 400 task (14 executors),
starts calculation, speed is pretty good. Hbase shows about 1000 requests
per second. But then Sparks stops tasks as completed. I can see that Spark
have read only 20% of records, but completed 50% tasks. HBase shows only 100
requests per second. When Sparks "thinks" that 99% completed (only 5 tasks
left), actually it read only 70% records. The rest of work will be done by 5
tasks with 1-2 request per second...

Is the any way to force Spark distribute workload evenly? I have tried to
pre-split my Phonix table (now it has about 1200 regions), but it did't
help.

-----Original Message-----
From: Josh Elser [mailto:els...@apache.org]
Sent: Friday, March 9, 2018 2:17 AM
To: user@phoenix.apache.org
Subject: Re: Phoenix as a source for Spark processing

How large is each row in this case? Or, better yet, how large is the table
in HBase?

You're spreading out approximately 7 "clients" to each Regionserver fetching
results (100/14). So, you should have pretty decent saturation from Spark
into HBase.

I'd be taking a look at the EXPLAIN plan for your SELECT DISTINCT to really
understand what Phoenix is doing. For example, are you getting ample
saturation of the resources that your servers have available (32core/128Gb
memory is pretty good). Validating how busy Spark is actually keeping HBase,
and how much time is spent transforming the data would be good. Or, another
point, are you excessively scanning data in the system which you could
otherwise preclude by a different rowkey structure via logic such as a
skip-scan (which would be shown in the EXPLAIN plan).

You may actually find that using the built-in UPSERT SELECT logic may
out-perform the Spark integration since you aren't actually doing any
transformation logic inside of Spark.


On 3/5/18 3:14 PM, Stepan Migunov wrote:
Hi Josh, thank you for response!

Our cluster has 14 nodes (32 cores each/128 GB memory). The source
Phoenix table contains about 1 billion records (100 columns). We start
a Spark's job with about 100 executors. Spark executes SELECT from the
source table (select 6 columns with DISTINCT) and writes down output
to another Phoenix table. Expected that the target table will contains
about 100 million records.
HBase has 14 region servers, both tables salted with SALT_BUCKETS=42.
Spark's job running via Yarn.


-----Original Message-----
From: Josh Elser [mailto:els...@apache.org]
Sent: Monday, March 5, 2018 9:14 PM
To: user@phoenix.apache.org
Subject: Re: Phoenix as a source for Spark processing

Hi Stepan,

Can you better ballpark the Phoenix-Spark performance you've seen (e.g.
how much hardware do you have, how many spark executors did you use,
how many region servers)? Also, what versions of software are you using?

I don't think there are any firm guidelines on how you can solve this
problem, but you've found the tools available for you.

* You can try Phoenix+Spark to run over the Phoenix tables in place
* You can use Phoenix+Hive to offload the data into Hive for queries

If Phoenix-Spark wasn't fast enough, I'd imagine using the
Phoenix-Hive integration to query the data would be similarly not fast
enough.

It's possible that the bottleneck is something we could fix in the
integration, or fix configuration of Spark and/or Phoenix. We'd need
you to help quantify this better :)

On 3/4/18 6:08 AM, Stepan Migunov wrote:
In our software we need to combine fast interactive access to the
data with quite complex data processing. I know that Phoenix intended
for fast access, but hoped that also I could be able to use Phoenix
as a source for complex processing with the Spark.  Unfortunately,
Phoenix + Spark shows very poor performance. E.g., querying big
(about billion records) table with distinct takes about 2 hours. At
the same time this task with Hive source takes a few minutes. Is it
expected? Does it mean that Phoenix is absolutely not suitable for
batch processing with spark and I should duplicate data to Hive and
process it with Hive?

Reply via email to