Hi Jinal,

> So how do I tell the planner in this
case to run in Sequential format. Here is how the code looks like
I didn't get your question. Did you mean "sequence file" format? HBase
cannot load sequence file natively. You have to transform it into HFile via
MR, then bulk load it into HBase.

The code piece you show looks good to me. One potential problem is that,
running MR on the same cluster with HBase will damage the serving quality
(i.e. causing greater latency).

In our production deployment, we have two clusters: one running HBase and
serving online requests and the other for offline MR jobs. So we run a
pipeline to produce HFiles, distcp them to the online cluster (with limited
#mappers), then perform HBase bulkload.

> Is there a way to do it Crunch itself to do incremental reads from HFiles
which are stored in hdfs?

No, there is no way to incrementally read HFiles. I think you have options:
1) use FromHBase#table(String, Scan) and specify the timestamp range that
you are interested. Note that this will issue read RPCs to Region Servers,
which may produce huge traffic and damage your online serving quality.
2) copy HFiles to another location and use HFileUtils#scanHFiles(). You can
also specify the timerange but it internally does a full scan.





2014-02-14 2:30 GMT+08:00 Jinal Shah <[email protected]>:

> Is there a way to do it Crunch itself to do incremental reads from HFiles
> which are stored in hdfs?
>
> Thanks
>
>
> On Thu, Feb 13, 2014 at 11:50 AM, Josh Wills <[email protected]> wrote:
>
> > The only option I have for you in that case is pipeline.run or
> > pipeline.done; LoadIncrementalHFiles isn't Crunch code, so we can't
> > incorporate it into the planner's decision making process. Does
> > LoadIncrementalHFiles even run an MR job?
> >
> >
> > On Thu, Feb 13, 2014 at 9:27 AM, Jinal Shah <[email protected]>
> > wrote:
> >
> > > Hi Josh,
> > > I tried the option you said and it worked perfectly fine where I'm
> doing
> > > parallelDo. But for HBase I'm using
> > > HFileUtils.writeToHFilesForIncrementalLoad() to write and then Reading
> > > using  LoadIncrementalHFiles class. So how do I tell the planner in
> this
> > > case to run in Sequential format. Here is how the code looks like
> > >
> > > HFileUtils.writeToHFilesForIncrementalLoad(PCollection<keyValue>,
> table,
> > > path)
> > > pipeline.run()
> > >
> > > LoadIncrementalHFiles loadIncremental = new
> > LoadIncrementalHFiles(config);
> > >
> > > loadIncremental.doBulkLoad(path, table);
> > >
> > > Thanks
> > > Jinal
> > >
> > >
> > > On Wed, Feb 12, 2014 at 11:27 AM, Josh Wills <[email protected]>
> > wrote:
> > >
> > > > I'm not sure what you want here-- there is a mechanism to force the
> > > planner
> > > > to run stages sequentially (even if the input to stage 2 does not
> > > directly
> > > > depend directly on the output from stage 1) by using
> ParallelDoOptions
> > to
> > > > introduce such a dependency, as I indicated before:
> > > >
> > > > SourceTarget marker = ...;
> > > > HBase.read()
> > > > doSomeChangesOnData
> > > > dummyDoFnToCreateMarker
> > > > HBase.write()
> > > > marker.write()
> > > > HBase.read().parallelDo(DoFn, PType,
> > > > ParallelDoOptions.builder().sourceTarget(marker).build());
> > > >
> > > > That's less of a hint to the planner and more of a command. Another
> > > option
> > > > would be to set the maximum number of simultaneously running jobs in
> > > Crunch
> > > > to 1 using the crunch.max.running.jobs configuration parameter,
> which
> > > > would
> > > > run everything in the pipeline sequentially, one job at a time.
> > > >
> > > >
> > > >
> > > > On Wed, Feb 12, 2014 at 9:15 AM, Jinal Shah <[email protected]
> >
> > > > wrote:
> > > >
> > > > > Can I get some comment on this?
> > > > >
> > > > >
> > > > > On Thu, Feb 6, 2014 at 11:00 AM, Jinal Shah <
> [email protected]
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Josh and Micah,
> > > > > >
> > > > > > In both the scenerios I can easily do a Pipeline.run() and get
> > going
> > > > from
> > > > > > there. But my main question would be why should I do a
> > pipeline.run()
> > > > in
> > > > > > between just to make the planner run something in a sequential
> > format
> > > > > > rather than the way it would have planned otherwise. What I'm
> > getting
> > > > at
> > > > > is
> > > > > > that there should some mechanism that will tell the Planner to do
> > > > > something
> > > > > > in a certain way to some extend like you can take example of
> Apache
> > > > Hive,
> > > > > > till 0.7 release Hive use to provide a mechanism called HINT
> which
> > > > would
> > > > > > tell the query planner to run something as indicated in the HINT
> > > rather
> > > > > > than the way it would have been otherwise. I know that you might
> > say
> > > it
> > > > > > might not create optimized plan but at this point the consumer is
> > > more
> > > > > > focused on the way it should be planned rather than the
> > optimization.
> > > > > >
> > > > > > May be there might be option already there in Crunch that I might
> > > have
> > > > > not
> > > > > > explored but just wanted to put my point out there. If there is
> an
> > > > > option I
> > > > > > would love to learn about it.
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 6, 2014 at 10:44 AM, Josh Wills <
> [email protected]>
> > > > > wrote:
> > > > > >
> > > > > >> Hey Jinal,
> > > > > >>
> > > > > >> On scenario 2, the easiest way to do this is to force a run()
> > > between
> > > > > the
> > > > > >> write and the second read, ala:
> > > > > >>
> > > > > >> HBase.read()
> > > > > >> doSomeChangesOnData
> > > > > >> HBase.write()
> > > > > >> Pipeline.run()
> > > > > >> HBase.read()
> > > > > >>
> > > > > >> If that isn't possible for some reason, you'll need to add an
> > output
> > > > > file
> > > > > >> to the first phase that can be used to indicate that the
> > HBase.write
> > > > is
> > > > > >> complete, and then have the second read depend on that file
> > existing
> > > > > >> before
> > > > > >> it can run, which can be done via ParallelDoOptions, e.g.,
> > > > > >>
> > > > > >> SourceTarget marker = ...;
> > > > > >> HBase.read()
> > > > > >> doSomeChangesOnData
> > > > > >> dummyDoFnToCreateMarker
> > > > > >> HBase.write()
> > > > > >> marker.write()
> > > > > >> HBase.read().parallelDo(DoFn, PType,
> > > > > >> ParallelDoOptions.builder().sourceTarget(marker).build());
> > > > > >>
> > > > > >> but that's obviously uglier and more complicated.
> > > > > >>
> > > > > >> J
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Feb 5, 2014 at 7:14 PM, Jinal Shah <
> > [email protected]
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Josh,
> > > > > >> >
> > > > > >> > Here is a small example of what I am looking for. So here is
> > what
> > > > I'm
> > > > > >> doing
> > > > > >> >
> > > > > >> > Scenario 1:
> > > > > >> >
> > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > >> > pipeline.write(s, path);
> > > > > >> > doSomeFilteringOn(s);
> > > > > >> >
> > > > > >> > I want that when I do some filtering this should be done in
> the
> > > map
> > > > > >> phase
> > > > > >> > instead it is doing it in the Reduce phase due to which I have
> > to
> > > > > >> introduce
> > > > > >> > a pipeline.run() and now this is what the code looks like
> > > > > >> >
> > > > > >> > PCollection<Something> s = FunctionDoingSomething();
> > > > > >> > pipeline.write(s, path);
> > > > > >> > pipeline.run()
> > > > > >> > doSomeFilteringOn(s);
> > > > > >> >
> > > > > >> > Scenerio 2:
> > > > > >> >
> > > > > >> > I'm doing an operation on HBase and here is how it looks.
> > > > > >> >
> > > > > >> > Hbase.read()
> > > > > >> > doSomeChangesOnData
> > > > > >> > HBase.write()
> > > > > >> > HBase.read()
> > > > > >> >
> > > > > >> > Now Crunch at this points considers both the reads as separate
> > and
> > > > > >> tries to
> > > > > >> > run it in parallel so now before I even write my changes it
> > reads
> > > > > those
> > > > > >> > changes so I have to again put a pipeline.run() in order to
> > break
> > > it
> > > > > >> into 2
> > > > > >> > separate flow and execute them in sequence.
> > > > > >> >
> > > > > >> > So I'm asking is there any way to send an HINT to the Planner
> > that
> > > > how
> > > > > >> it
> > > > > >> > create the Plan instead of it deciding by itself or someway to
> > > have
> > > > > more
> > > > > >> > control how to make a planner understand in certain
> situations.
> > > > > >> >
> > > > > >> > Thanks
> > > > > >> > Jinal
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Jan 30, 2014 at 11:10 AM, Josh Wills <
> > [email protected]
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > On Thu, Jan 30, 2014 at 7:09 AM, Jinal Shah <
> > > > > [email protected]>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi everyone,
> > > > > >> > > >
> > > > > >> > > > This is Jinal Shah, I'm new to the group. I had a question
> > > about
> > > > > >> > > Execution
> > > > > >> > > > Control in Crunch. Is there any way we can force Crunch to
> > do
> > > > > >> certain
> > > > > >> > > > operations in parallel or certain operations in sequential
> > > ways.
> > > > > For
> > > > > >> > > > example, let's say if we want the pipeline to executed a
> > > > > particular
> > > > > >> > DoFn
> > > > > >> > > > function in the Map phase instead of the Reduce phase or
> > > > > >> vice-versa. Or
> > > > > >> > > > Execute a particular Flow only after a particular flow is
> > > > > completed
> > > > > >> as
> > > > > >> > > > oppose to running it in parallel.
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > > Forcing a DoFn to operate in a map or reduce phase is tough
> > for
> > > > the
> > > > > >> > planner
> > > > > >> > > to do right now; we sort of rely on the developer to have a
> > > mental
> > > > > >> model
> > > > > >> > of
> > > > > >> > > how the jobs will proceed. The place where you usually want
> to
> > > > > force a
> > > > > >> > DoFn
> > > > > >> > > to execute in the reduce vs. the map phase is when you have
> > > > > dependent
> > > > > >> > > groupByKey operations, and you can use cache() or
> > materialize()
> > > on
> > > > > the
> > > > > >> > > intermediate output that you want to split on, and the
> planner
> > > > will
> > > > > >> > respect
> > > > > >> > > that.
> > > > > >> > >
> > > > > >> > > On the latter question, the thing to look for is
> > > > > >> > > org.apache.crunch.ParallelDoOptions, which isn't something
> > I've
> > > > > doc'd
> > > > > >> in
> > > > > >> > > the user guide yet (it's on the todo list, I promise.) You
> can
> > > > give
> > > > > a
> > > > > >> > > parallelDo call an additional argument that specifies one or
> > > more
> > > > > >> > > SourceTargets that have to exist before a particular DoFn is
> > > > allowed
> > > > > >> to
> > > > > >> > > run. In this way, you can force aspects of the pipeline to
> be
> > > > > >> sequential
> > > > > >> > > instead of parallel. We make use of ParallelDoOptions inside
> > of
> > > > the
> > > > > >> > > MapsideJoinStrategy code, to ensure that the data set that
> > we'll
> > > > be
> > > > > >> > loading
> > > > > >> > > in-memory actually exists in the file system before we run
> the
> > > > code
> > > > > >> that
> > > > > >> > > reads it into memory.
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > >
> > > > > >> > > > Maybe this might be asked before so sorry if it came
> again.
> > If
> > > > you
> > > > > >> guys
> > > > > >> > > > have further question on the details do let me know
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > Thanks everyone and Have a great day.
> > > > > >> > > >
> > > > > >> > > > Thanks
> > > > > >> > > > Jinal
> > > > > >> > > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > --
> > > > > >> > > Director of Data Science
> > > > > >> > > Cloudera <http://www.cloudera.com>
> > > > > >> > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >
> > >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
> >
>

Reply via email to