The wikisearch example provides something similar to a local index. Rather
than stuff things into two tablets, a single row in accumulo contains both
the index and data stored in separate column families. Iterator trees are
used to execute queries and retrieve data with that row.


On Thu, May 1, 2014 at 2:24 AM, James Taylor <jamestay...@apache.org> wrote:

> Thanks for the explanations, Josh. This sounds very doable. Few more
> comments inline below.
>
> James
>
>
> On Wed, Apr 30, 2014 at 8:37 AM, Josh Elser <josh.el...@gmail.com> wrote:
>
> >
> >
> > On 4/30/14, 3:33 AM, James Taylor wrote:
> >
> >> On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <josh.el...@gmail.com>
> >> wrote:
> >>
> >>  @Josh - it's less baked in than you'd think on the client where the
> query
> >>>
> >>>> parsing, compilation, optimization, and orchestration occurs. The
> >>>> client/server interaction is hidden behind the ConnectionQueryServices
> >>>> interface, the scanning behind ResultIterator (in
> >>>> particular ScanningResultIterator), the DML behind MutationState, and
> >>>> KeyValue interaction behind KeyValueBuilder. Yes, though, it would
> >>>> require
> >>>> some more abstraction, but probably not too bad, though. On the
> >>>> server-side, the entry points would all be different and that's where
> >>>> I'd
> >>>> need your insights for what's possible.
> >>>>
> >>>>
> >>> Definitely. I'm a little concerned about what's expected to be provided
> >>> by
> >>> the "database" (HBase, Accumulo) as I believe HBase is a little more
> >>> flexible in allowing writes internally where Accumulo has thus far said
> >>> "you're gonna have a bad time".
> >>>
> >>
> >>
> >> Tell me more about what you mean by "allowing writes internally".
> >>
> >
> > Haha, sorry, that was a sufficiently ominous statement with insufficient
> > context.
> >
> > For discussion sake, let's just say HBase coprocessors and Accumulo
> > iterators are equivalent, purely in the scope of "running server-side
> code"
> > (in the RegionServer/TabletServer). However, there is a notable
> difference
> > in the pipeline where each of those are implemented.
> >
> > Coprocessors have built-in hooks that let you get updates on
> > PUT/GET/DELETE/etc as well as pre and post each of those operations. In
> > other words, they provide hooks at a "high database level".
> >
> > Iterators tend to be much closer to the data itself, only dealing with
> > streams of data (other iterators stacked on one another). Iterators
> > implement versioning, visibilities, and can even implement complex
> > searches. The downside of this approach is that iterators lack any means
> to
> > safely write data _outside of the sorted Key-Value pairs in the tablet
> > currently being processed_. It's possible to make in tablet updates, but
> > sorted order within a large tablet might make this difficult as well.
> >
> > This is why I was thinking percolator would be a better solution, as it's
> > meant for handling updates like this server-side. However, I imagine it
> > would be possible, in the short-term, to make some separate process
> between
> > Phoenix and Accumulo which handles writes.
>
>
> Another fallback might be to do global index maintenance on the client.
> It'd just be more expensive, especially if you want to handle out-of-order
> updates (which are particularly tricky, as you have to get multiple
> versions of the rows to work out all the different scenarios here).
>
> A second fallback might be to support only local indexing. Does Accumulo
> have the concept of a "custom load balancer" that would allow you to
> co-locate two regions from different tables? The local-index features has
> kind of driven some feature requests on that front for HBase - mainly
> callbacks when a region is split or re-located. The rows of the local index
> are prefixed with the region start key to keep them together and identify
> them.
>
> >
> >
> >
> >>
> >>>
> >>>   @Eric - I agree about having txn support (probably through snapshot
> >>>
> >>>> isolation) by controlling the timestamp, and then layering indexing on
> >>>> top
> >>>> of that. That's where we're headed. But I wouldn't let that stop the
> >>>> effort
> >>>> - it would just be layered on top of what's already there. FWIW,
> there's
> >>>> another interesting indexing model that has been termed "local
> >>>> indexing"(
> >>>> https://github.com/Huawei-Hadoop/hindex) which is being worked on
> right
> >>>> now
> >>>> (should be available in either our 4.1 or 4.2 release). In this model,
> >>>> the
> >>>> table data and index data are co-located on the same region server
> >>>> through
> >>>> a kind of "buddy" region mechanism. The advantage is that you take no
> >>>> hit
> >>>> at write time, as you're writing both the index and table data
> together.
> >>>> Not sure how/if this would transfer over to the Accumulo world.
> >>>>
> >>>>
> >>> Interesting. Given that Accumulo doesn't have a fixed column family
> >>> schema, this might make index generation even easier (maybe "cleaner"
> is
> >>> the proper word). You could easily co-locate the indices with the data,
> >>> given them a proper name.
> >>>
> >>>
> >> With HBase, you can do something similar (though, you're right, you'd
> need
> >> to create the column family upfront or take the hit of creating it
> >> dynamically - that's a nice feature that Accumulo has). The reason this
> >> doesn't work is that you need a different row key so that the index rows
> >> are ordered according to their indexed column values. If you put it in a
> >> column family of the data table, they're ordered in the same way as the
> >> data table. This makes range scans over index tables very expensive, as
> >> the
> >> rows would need to be re-ordered.
> >>
> >>
> > Ah, of course. You need the term up front to make it sort properly.
> >
> >
> >
> >>> Problem still exists that we don't have a solid way to do this solely
> >>> inside of Accumulo ATM. I'd imagine that if someone stepped up to
> >>> implement
> >>> coprocessors, we'd be taking the route of a separate, standalone
> process
> >>> (as opposed to in-RegionServer). Hypothetically, we could do the same
> for
> >>> Phoenix in the short-term.
> >>>
> >>> Can you quantify what would be expected by Accumulo to integrate with
> >>> Phoenix (maybe list what exactly is done inside of HBase at a high
> >>> level?)
> >>> so that we could give some more targeted ideas/feelings as to what the
> >>> level of work would be inside Accumulo?
> >>>
> >>
> >>
> >> There's not a lot of hard/fast requirements. Most of what Phoenix does
> is
> >> to optimize performance by leveraging the capabilities of the server. In
> >> terms of hard/fast requirements, these come to mind:
> >> - data is returned in row key order from range scans
> >> - a scan may set a start key/stop key to do a range scan
> >> - a row key may be composed of arbitrary bytes
> >> - a client may "pre-split" a table by providing the region boundaries at
> >> table create time (we rely on this for salting to prevent hotspotting:
> >> http://phoenix.incubator.apache.org/salted.html).
> >> - the client has access to the region boundaries of a table (this allows
> >> for better parallelization)
> >> - the client may issue chunk up a scan into smaller, multiple scans and
> >> run
> >> them in parallel
> >> Some of these may be a bit squishy, as there may be existing machinery
> >> already in your client programming model that could be leverage. The
> >> client
> >> API of HBase, for example, does not provide the ability out of the box
> to
> >> parallelize a scan, so this is something Phoenix had to add on top
> >> (through
> >> chunking up scans at or within region boundaries).
> >>
> >
> > All of these look fine. The Accumulo BatchScanner does that
> > parallelization for you which is really nice (handling tablet migration
> and
> > all that fun stuff transparently).
>
>
> That's nice that Accumulo has this built-in. Does it allow the client to
> specify the split points for the scan in some way?
>
> >
> >
> >
> >  Phoenix manages the metadata of your tables (tables, columns, indexes,
> >> views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP
> >> TABLE, ALTER TABLE are atomic, transactional operations b/c we don't
> want
> >> our metadata table to get in a corrupt state. To accomplish this, we
> rely
> >> on:
> >> - setting a "split policy" that ensures that the table data for a given
> >> "tenant" (we support multi-tenancy:
> >> http://phoenix.incubator.apache.org/multi-tenancy.html) stay together
> in
> >> the same region.
> >> - putting the data using an API that guarantees that either the entire
> >> batch of mutations succeed or fail completely.
> >> Again, these are details of our implementation on HBase which do not
> >> necessarily need to be implemented in the same way on a different
> system.
> >>
> >
> > I'd have to look again at how our mutation failures are handled (or
> > someone else can chime in). This might be something to keep an eye on
> > depending on the distribution of mutations in regards to tables.
> >
> >
> >  Phoenix supports sequences which are atomically incremented values. This
> >> is
> >> done through a coprocessor currently, due to some limitations with the
> >> HBase Increment API, but the idea is the same as an atomic increment.
> >>
> >
> > Conditional Mutations in the about-to-be-released version 1.6.0 will
> > provide this.
> >
> >
> >  Phoenix does the following push down:
> >> - the WHERE clause gets transformed into three things: a start/stop key
> of
> >> a scan, a skip scan filter to efficiently navigate the key space (see
> >> http://phoenix-hbase.blogspot.com/2013/05/demystifying-skip-
> >> scan-in-phoenix.html),
> >> and a custom filter to rule out a row based on some java code that does
> >> expression evaluation.
> >> - the GROUP BY clause gets pushed to the server and a coprocessor runs
> the
> >> scan on each region so that the client doesn't have to get back all the
> >> raw
> >> data. Instead, the client gets back the aggregated data (to conserve
> >> network bandwidth and to run the scan where the data lives). The client
> >> then does a final merge sort.
> >>
> >
> > I've written an iterator to do a group by previously. Depending on the
> > schema this is fine.
> >
> >
> >  - the ORDER BY clause used in combination with the LIMIT clause is a
> TopN
> >> query. We optimize this by each region holding on to the top N values
> with
> >> the client then doing a merge sort with the limit applied.
> >>
> >
> > This is an interesting one. If you remove the possibility of tablets
> > splitting out from underneath you and you had a view of the splits, you
> > could probably pull it off.
> >
> >
> >  - the ORDER BY clause on it's own gets executed on each region (spooled
> >> using memory mapped files) and then the client does a merge sort. This
> >> spooling could potentially be done on the client side.
> >>
> >
> > Unless we can do some trickery with the schema, yeah, client side.
> >
> >
> >  - joins are executed as a broadcast hash join. We run one side of the
> >> query
> >> (with the filters applied), compact the results, and send them to each
> >> region server where they are cached while we run the other side of the
> >> query. A coprocessor then does a map lookup (equi-joins only are
> supported
> >> currently) to join based on the join key and returns the joined results
> >> (i.e. the concatenated values in a single, condensed key value as access
> >> from the client is positional post-join).
> >>
> >
> > The join approach would need to be implemented some other way for the
> > earlier stated comparison of iterators and coprocessors.
>
>
> Client-side could be another fallback. The coprocessor approach is really
> only a big win in two cases: if you have a join which doesn't have many
> matches (as those rows get filtered on the server-side), or for correlated
> sub queries or exists queries where you can filter or collapse many rows to
> one or none on the server-side rather than return them all to the client.
>
> >
> >
> >  For our global secondary indexes (local secondary indexes are different
> as
> >> we discussed already), we trap updates to the data table through a
> >> coprocessor. For index maintenance you need to know when a change occurs
> >> to
> >> a data row what the prior value of the row was. The reason is because
> you
> >> need to delete the index row corresponding to the old data row and then
> >> insert the index row corresponding to the new value (remember, the index
> >> value makes up the row key). By doing this operation through a
> >> coprocessor,
> >> we know that we can get the prior data row state locally. We still need
> to
> >> issue a Put from one region server to another, but this isn't really an
> >> extra hop, as if it was done on the client, the same hop would need to
> be
> >> done (but the old row state would need to be pulled over to the client
> >> which is not necessary with the coprocessor based approach). For more on
> >> global secondary indexing, see
> >> http://phoenix.incubator.apache.org/secondary_indexing.html (there are
> >> some
> >> good presentations at the end of the page that provide more technical
> >> detail).
> >>
> >
> > Right, you want to remove the old index value and update a new index
> value
> > (actually being two unique keys) in the same transaction to ensure a
> valid
> > index. Or, at least ensure that you never remove the old value, and die
> > before inserting the new value.
> >
> > Again, not going to work well in an iterator.
> >
> >
> >  Phoenix also allows "point-in-time" queries where a client may
> establish a
> >> connection at an earlier timestamp. If your table is setup to keep
> >> multiple
> >> versions of the same row, then you can query "back-in-time" and will see
> >> the data as it was at that point. We more or less get this for free with
> >> the MVCC model of HBase by specifying a max timestamp on a scan. One
> >> slightly tricky bit is we correlate the current DDL of your table based
> on
> >> the same timestamp as with your data. So when you go back-in-time like
> >> this, you'll also see the structure of your table as it was at time
> also.
> >>
> >
> > I don't see this as a problem. As long as we remove the versioning
> > iterator from a table (which keeps the most recent version of a key by
> > default), it should be pretty easy to implement an iterator which adheres
> > to the "max timestamp" semantics.
> >
> >
> >  So we do rely on coprocessors, but the underlying APIs were accessing on
> >> the server-side are pretty light.
> >>
> >>   TLDR? Let's continue in the JIRA?
> >>
> >>>
> >>>>
> >>> Mailing list is fine by me for while we get this hashed out :). We can
> >>> move to Jira when we start getting into specifics.
> >>>
> >>>
> >>
>

Reply via email to