Hey all -

I got a few questions today I couldn't answer, and wrote this up while
digging into some details. Currently it just discusses how data is loaded
with Pig up to the point a MR job is launched. Feedback would be great!

https://cwiki.apache.org/confluence/display/HCATALOG/Life+of+an+HCatalog+Query
Loading data in Pig with HCatLoader

Let's start by examining a simple Pig script with the following attributes:

   - Loads data through HCatalog.
   - Filters data by partition fields (part_dt) and non-partition fields (
   type).
   - Projects a single field from the records.

data = load 'db.table' using org.apache.hcatalog.pig.HCatLoader();

data = filter data by
    part_dt >= '20120725T000000Z' and
    part_dt < '20120726T000000Z' and
    type == 'LLAMA';

data = foreach data generate type;

dump data;

A number of existing Pig optimizations are supported to efficiently load
data through HCatalog, both in the frontend and backend.
In the Pig Frontend

The first interaction with HCatLoader happens while Pig is parsing the
query and generating the logical plan. HCatLoader.getSchema is called,
which causes HCatalog to query the HiveMetaStore for the *table schema*,
which is used as the schema for all records in this table. This is
important to understand because each partition can have its own schema, but
only fields in the table schema are actually usable.

Once the logical query plan is complete, Pig compiles and optimizes the
plan. During optimization, Pig calls HCatLoader.getPartitionKeys which
queries the HiveMetaStore for the table partition keys, and
HCatLoader.setPartitionFilter to set the filter that will be used later on
when querying the HiveMetaStore for input partitions.

Let's examine the above filter statement, which includes both partition and
non-partion fields. Since Pig knows the table is partitioned by part_dt (but
nottype), the following partition filter is used: ((part_dt >=
'20120725T000000Z') and (part_dt < '20120726T000000Z'))

Let's also briefly examine how Pig loaders work. At the most basic level a
Pig loader implements LoadFunc, providing methods to set the input
location, get records, etc. Additional features may be provided by
implementing additional interfaces; in this context the interesting
interfaces implemented by HCatLoaderare LoadMetadata (providing Pig with
access to the schema, partition keys, and partition filtering
functionality) and LoadPushDown which provides projection pushdown support.

During query compilation and optimization, Pig calls
HCatBaseLoader.pushProjection that passes the list of required fields into
the loader. Since the above query only makes use of non-partition field type,
that single field is added to a required fields list and pushed into the
loader. Later, when constructing Pig tuples the loader can use the required
fields list to most efficiently read the data.

After Pig has completed compiling and optimizing the query execution plan,
it calls HCatLoader.setLocation to set the db_name.table_name to load data
from. This causes HCatLoader to query the HiveMetaStore for the partitions
that require processing (making sure to use the partition filter we
discussed above). These partitions are passed to HCatInputFormat.setInput which
serializes them into the job configuration for access by the Pig backend,
andHCatInputFormat.setOutputSchema serializes the output schema in the job
configuration, which will be used when projecting records in the backend.

And finally Pig begins to execute the query.
In the Pig Backend

The Pig backend has two main parts: a client that manages launching
MapReduce jobs on the cluster, and the jobs themselves.

When launching a MR job, Pig calls HCatLoader.setLocation with the
db_name.table_name to load data from, however this time instead of querying
the HiveMetaStore for partition information we largely restore data
previously serialized into the job configuration, and call
HCatBaseInputFormat.setOutputSchema for use later when projecting records.

Now let's get the input splits, which represent the actual data to load.
HCatBaseInputFormat.getSplits is called, which deserializes the partitions
previously stored in the job configuration on the frontend, and for each
one creates the actual input format, gets the actual splits, and wraps each
one as anHCatSplit that contains the underlying split, as well as partition
information necessary to later deserialize the records it returns.

At this point Pig actually launches a MapReduce job on the cluster.

--Travis

Reply via email to