Do an explain on your query to confirm that it's doing a full scan and not a skip scan.
I typically use an in () clause instead of or, especially with compound keys. I have also had to hint queries to use a skip scan, e.g /*+ SKIP_SCAN */. Phoenix seems to do a very good job not reading data from column families that aren't needed by the query, so I think your schema design is fine. > On Jan 19, 2017, at 10:30 AM, Mark Heppner <heppner.m...@gmail.com> wrote: > > Thanks for the quick reply, Josh! > > For our demo cluster, we have 5 nodes, so the table was already set to 10 > salt buckets. I know you can increase the salt buckets after the table is > created, but how do you change the split points? The repartition in Spark > seemed to be extremely inefficient, so we were trying to skip it and keep the > 400+ default partitions. > > The biggest issue we're facing is that as Spark goes through the partitions > during the scan, it becomes exponentially slower towards the end. Around task > 380/450, it slows down to a halt, eventually timing out around 410 and > getting killed. We have no idea if this is something with Spark, YARN, or > HBase, so that's why we were brainstorming with using the foreign key-based > layout, hoping that the files on HDFS would be more compacted. > > We haven't noticed too much network overhead, nor have we seen CPU or RAM > usage too high. Our nodes are pretty big, 32 cores and 256 GB RAM each, > connected on a 10 GbE network. Even if our query is for 80-100 rows, the > Spark job still slows to a crawl at the end, but that should really only be > about 80 MB of data it would be pulling out of Phoenix into the executors. I > guess we should have verified that the Phoenix+Spark plugin did achieve data > locality, but there isn't anything that says otherwise. Even though it > doesn't have data locality, we have no idea why it would progressively slow > down as it reaches the end of the scan/filter. > > The images are converted to a NumPy array, then saved as a binary string into > Phoenix. In Spark, this is fairly quick to convert the binary string back to > the NumPy array. This also allows us to use GET_BYTE() from Phoenix to > extract specific values within the array, without going through Spark at all. > Do you have any other architecture recommendations for our use case? Would > storing the images directly in HBase be any better? > >> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jmaho...@gmail.com> wrote: >> Hi Mark, >> >> At present, the Spark partitions are basically equivalent to the number of >> regions in the underlying HBase table. This is typically something you can >> control yourself, either using pre-splitting or salting >> (https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix). >> Given that you have 450+ partitions though, it sounds like you should be >> able to achieve a decent level or parallelism, but that's a knob you can >> fiddle with. It might also be useful to look at Spark's "repartition" >> operation if you have idle Spark executors. >> >> The partitioning is sort of orthogonal from the primary key layout and the >> resulting query efficiency, but the strategy you've taken with your schema >> seems fairly sensible to me. Given that your primary key is the 'id' field, >> the query you're using is going to be much more efficient than, e.g., >> filtering on the 'title' column. Iterating on your schema and queries using >> straight SQL and then applying that to Spark after is probably a good >> strategy here to get more familiar with query performance. >> >> If you're reading the binary 'data' column in Spark and seeing a lot of >> network overhead, one thing to be aware of is the present Phoenix MR / Spark >> code isn't location aware, so executors are likely reading big chunks of >> data from another node. There's a few patches in to address this, but >> they're not in a released version yet: >> >> https://issues.apache.org/jira/browse/PHOENIX-3600 >> https://issues.apache.org/jira/browse/PHOENIX-3601 >> >> Good luck! >> >> Josh >> >> >> >> >>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <heppner.m...@gmail.com> >>> wrote: >>> Our use case is to analyze images using Spark. The images are typically >>> ~1MB each, so in order to prevent the small files problem in HDFS, we went >>> with HBase and Phoenix. For 20+ million images and metadata, this has been >>> working pretty well so far. Since this is pretty new to us, we didn't >>> create a robust design: >>> >>> CREATE TABLE IF NOT EXISTS mytable >>> ( >>> id VARCHAR(36) NOT NULL PRIMARY KEY, >>> title VARCHAR, >>> ... >>> image.dtype VARCHAR(12), >>> image.width UNSIGNED_INT, >>> image.height UNSIGNED_INT, >>> image.data VARBINARY >>> ) >>> >>> Most queries are on the metadata, so all of that is kept in the default >>> column family. Only the image data is stored in a secondary column family. >>> Additional indexes are created anyways, so the main table isn't usually >>> touched. >>> >>> We first run a Phoenix query to check if there are any matches. If so, then >>> we start a Spark job on the images. The primary keys are sent to the >>> PySpark job, which then grabs the images based on the primary keys: >>> >>> df = sqlContext.read \ >>> .format('org.apache.phoenix.spark') \ >>> .option('table', 'mytable') \ >>> .option('zkUrl', 'localhost:2181:/hbase-unsecure') \ >>> .load() >>> df.registerTempTable('mytable') >>> >>> query = >>> df_imgs = sqlContext.sql( >>> 'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...' >>> ) >>> >>> When this was first designed, we thought since the lookup was by primary >>> key, it would be smart enough to do a skip scan, but it appears to be doing >>> a full scan. The df_imgs.rdd.getNumPartitions() ends up being 450+, which >>> matches up with the number of split files in HDFS. >>> >>> Would it be better to use a foreign key and split the tables : >>> >>> CREATE TABLE IF NOT EXISTS mytable >>> ( >>> id VARCHAR(36) NOT NULL PRIMARY KEY, >>> title VARCHAR, >>> image_id VARCHAR(36) >>> ) >>> CREATE TABLE IF NOT EXISTS images >>> ( >>> image_id VARCHAR(36) NOT NULL PRIMARY KEY, >>> dtype VARCHAR(12), >>> width UNSIGNED_INT, >>> height UNSIGNED_INT, >>> data VARBINARY >>> ) >>> >>> If the first query grabs the image_ids and send them to Spark, would Spark >>> be able to handle the query more efficiently? >>> >>> If this is a better design, is there any way of moving the "image" column >>> family from "mytable" to the default column family of the new "images" >>> table? Is it possible to create the new table with the "image_id"s, make >>> the foreign keys, then move the column family into the new table? >>> >>> >>> -- >>> Mark Heppner >> > > > > -- > Mark Heppner