Hi Mark,

At present, the Spark partitions are basically equivalent to the number of
regions in the underlying HBase table. This is typically something you can
control yourself, either using pre-splitting or salting (
https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix).
Given that you have 450+ partitions though, it sounds like you should be
able to achieve a decent level or parallelism, but that's a knob you can
fiddle with. It might also be useful to look at Spark's "repartition"
operation if you have idle Spark executors.

The partitioning is sort of orthogonal from the primary key layout and the
resulting query efficiency, but the strategy you've taken with your schema
seems fairly sensible to me. Given that your primary key is the 'id' field,
the query you're using is going to be much more efficient than, e.g.,
filtering on the 'title' column. Iterating on your schema and queries using
straight SQL and then applying that to Spark after is probably a good
strategy here to get more familiar with query performance.

If you're reading the binary 'data' column in Spark and seeing a lot of
network overhead, one thing to be aware of is the present Phoenix MR /
Spark code isn't location aware, so executors are likely reading big chunks
of data from another node. There's a few patches in to address this, but
they're not in a released version yet:

https://issues.apache.org/jira/browse/PHOENIX-3600
https://issues.apache.org/jira/browse/PHOENIX-3601

Good luck!

Josh




On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <heppner.m...@gmail.com>
wrote:

> Our use case is to analyze images using Spark. The images are typically
> ~1MB each, so in order to prevent the small files problem in HDFS, we went
> with HBase and Phoenix. For 20+ million images and metadata, this has been
> working pretty well so far. Since this is pretty new to us, we didn't
> create a robust design:
>
> CREATE TABLE IF NOT EXISTS mytable
> (
>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>     title VARCHAR,
>     ...
>     image.dtype VARCHAR(12),
>     image.width UNSIGNED_INT,
>     image.height UNSIGNED_INT,
>     image.data VARBINARY
> )
>
> Most queries are on the metadata, so all of that is kept in the default
> column family. Only the image data is stored in a secondary column family.
> Additional indexes are created anyways, so the main table isn't usually
> touched.
>
> We first run a Phoenix query to check if there are any matches. If so,
> then we start a Spark job on the images. The primary keys are sent to the
> PySpark job, which then grabs the images based on the primary keys:
>
> df = sqlContext.read \
>     .format('org.apache.phoenix.spark') \
>     .option('table', 'mytable') \
>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>     .load()
> df.registerTempTable('mytable')
>
> query =
> df_imgs = sqlContext.sql(
>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
> )
>
> When this was first designed, we thought since the lookup was by primary
> key, it would be smart enough to do a skip scan, but it appears to be doing
> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which
> matches up with the number of split files in HDFS.
>
> Would it be better to use a foreign key and split the tables :
>
> CREATE TABLE IF NOT EXISTS mytable
> (
>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>     title VARCHAR,
>     image_id VARCHAR(36)
> )
> CREATE TABLE IF NOT EXISTS images
> (
>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>     dtype VARCHAR(12),
>     width UNSIGNED_INT,
>     height UNSIGNED_INT,
>     data VARBINARY
> )
>
> If the first query grabs the image_ids and send them to Spark, would Spark
> be able to handle the query more efficiently?
>
> If this is a better design, is there any way of moving the "image" column
> family from "mytable" to the default column family of the new "images"
> table? Is it possible to create the new table with the "image_id"s, make
> the foreign keys, then move the column family into the new table?
>
>
> --
> Mark Heppner
>

Reply via email to