Do an explain on your query to confirm that it's doing a full scan and not a 
skip scan.

I typically use an in () clause instead of or, especially with compound keys. I 
have also had to hint queries to use a skip scan, e.g /*+ SKIP_SCAN */.

Phoenix seems to do a very good job not reading data from column families that 
aren't needed by the query, so I think your schema design is fine.

> On Jan 19, 2017, at 10:30 AM, Mark Heppner <heppner.m...@gmail.com> wrote:
> 
> Thanks for the quick reply, Josh!
> 
> For our demo cluster, we have 5 nodes, so the table was already set to 10 
> salt buckets. I know you can increase the salt buckets after the table is 
> created, but how do you change the split points? The repartition in Spark 
> seemed to be extremely inefficient, so we were trying to skip it and keep the 
> 400+ default partitions.
> 
> The biggest issue we're facing is that as Spark goes through the partitions 
> during the scan, it becomes exponentially slower towards the end. Around task 
> 380/450, it slows down to a halt, eventually timing out around 410 and 
> getting killed. We have no idea if this is something with Spark, YARN, or 
> HBase, so that's why we were brainstorming with using the foreign key-based 
> layout, hoping that the files on HDFS would be more compacted.
> 
> We haven't noticed too much network overhead, nor have we seen CPU or RAM 
> usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each, 
> connected on a 10 GbE network. Even if our query is for 80-100 rows, the 
> Spark job still slows to a crawl at the end, but that should really only be 
> about 80 MB of data it would be pulling out of Phoenix into the executors. I 
> guess we should have verified that the Phoenix+Spark plugin did achieve data 
> locality, but there isn't anything that says otherwise. Even though it 
> doesn't have data locality, we have no idea why it would progressively slow 
> down as it reaches the end of the scan/filter.
> 
> The images are converted to a NumPy array, then saved as a binary string into 
> Phoenix. In Spark, this is fairly quick to convert the binary string back to 
> the NumPy array. This also allows us to use GET_BYTE() from Phoenix to 
> extract specific values within the array, without going through Spark at all. 
> Do you have any other architecture recommendations for our use case? Would 
> storing the images directly in HBase be any better?
> 
>> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jmaho...@gmail.com> wrote:
>> Hi Mark,
>> 
>> At present, the Spark partitions are basically equivalent to the number of 
>> regions in the underlying HBase table. This is typically something you can 
>> control yourself, either using pre-splitting or salting 
>> (https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix).
>>  Given that you have 450+ partitions though, it sounds like you should be 
>> able to achieve a decent level or parallelism, but that's a knob you can 
>> fiddle with. It might also be useful to look at Spark's "repartition" 
>> operation if you have idle Spark executors.
>> 
>> The partitioning is sort of orthogonal from the primary key layout and the 
>> resulting query efficiency, but the strategy you've taken with your schema 
>> seems fairly sensible to me. Given that your primary key is the 'id' field, 
>> the query you're using is going to be much more efficient than, e.g., 
>> filtering on the 'title' column. Iterating on your schema and queries using 
>> straight SQL and then applying that to Spark after is probably a good 
>> strategy here to get more familiar with query performance.
>> 
>> If you're reading the binary 'data' column in Spark and seeing a lot of 
>> network overhead, one thing to be aware of is the present Phoenix MR / Spark 
>> code isn't location aware, so executors are likely reading big chunks of 
>> data from another node. There's a few patches in to address this, but 
>> they're not in a released version yet:
>> 
>> https://issues.apache.org/jira/browse/PHOENIX-3600
>> https://issues.apache.org/jira/browse/PHOENIX-3601
>> 
>> Good luck!
>> 
>> Josh
>> 
>> 
>> 
>> 
>>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <heppner.m...@gmail.com> 
>>> wrote:
>>> Our use case is to analyze images using Spark. The images are typically 
>>> ~1MB each, so in order to prevent the small files problem in HDFS, we went 
>>> with HBase and Phoenix. For 20+ million images and metadata, this has been 
>>> working pretty well so far. Since this is pretty new to us, we didn't 
>>> create a robust design:
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     ...
>>>     image.dtype VARCHAR(12),
>>>     image.width UNSIGNED_INT,
>>>     image.height UNSIGNED_INT,
>>>     image.data VARBINARY
>>> )
>>> 
>>> Most queries are on the metadata, so all of that is kept in the default 
>>> column family. Only the image data is stored in a secondary column family. 
>>> Additional indexes are created anyways, so the main table isn't usually 
>>> touched.
>>> 
>>> We first run a Phoenix query to check if there are any matches. If so, then 
>>> we start a Spark job on the images. The primary keys are sent to the 
>>> PySpark job, which then grabs the images based on the primary keys:
>>> 
>>> df = sqlContext.read \
>>>     .format('org.apache.phoenix.spark') \
>>>     .option('table', 'mytable') \
>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>     .load()
>>> df.registerTempTable('mytable')
>>> 
>>> query = 
>>> df_imgs = sqlContext.sql(
>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>> )
>>> 
>>> When this was first designed, we thought since the lookup was by primary 
>>> key, it would be smart enough to do a skip scan, but it appears to be doing 
>>> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which 
>>> matches up with the number of split files in HDFS.
>>> 
>>> Would it be better to use a foreign key and split the tables :
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     image_id VARCHAR(36)
>>> )
>>> CREATE TABLE IF NOT EXISTS images
>>> (
>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     dtype VARCHAR(12),
>>>     width UNSIGNED_INT,
>>>     height UNSIGNED_INT,
>>>     data VARBINARY
>>> )
>>> 
>>> If the first query grabs the image_ids and send them to Spark, would Spark 
>>> be able to handle the query more efficiently?
>>> 
>>> If this is a better design, is there any way of moving the "image" column 
>>> family from "mytable" to the default column family of the new "images" 
>>> table? Is it possible to create the new table with the "image_id"s, make 
>>> the foreign keys, then move the column family into the new table?
>>> 
>>> 
>>> -- 
>>> Mark Heppner
>> 
> 
> 
> 
> -- 
> Mark Heppner

Reply via email to