On Tue, Feb 9, 2016 at 9:46 AM, Serega Sheypak <serega.shey...@gmail.com>
wrote:

> I've modified my code:
>
> void saveUsers(Collection<User> users){
>         if(users && !users.isEmpty()){
>
>             // Get Connection instance, instance created once. , BM is new
> for each request.'
>


If new for each request, don't bother using BM. Otherwise, put the BM in
same place you keep your Connection and just be sure to call close on BM
when your app goes down.
St.Ack






>             def mutator =
> getConnection().getBufferedMutator(getBufferedMutatorParams())
>
>              List<Put> putList = users.collect{toPut(it)}
>             mutator.mutate(putList)
>             *mutator.close() // exception here*
>         }
>     }
>
> Exception is still thrown
>
> 2016-02-09 15:43 GMT+01:00 Stack <st...@duboce.net>:
>
> > On Tue, Feb 9, 2016 at 6:37 AM, Serega Sheypak <serega.shey...@gmail.com
> >
> > wrote:
> >
> > > Hi, thanks for reply!
> > >
> > >
> > > > What should we add here to make the doc more clear on
> BufferedMutator?
> > > >
> > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > It's pretty clear.
> > >
> > > > And in the example?
> > > Example shows that both connection and buffered mutator are thread safe
> > and
> > > they are closed at once.
> > > - Should keep single instance of connection and bufferedmutator per
> > thread?
> > >
> >
> > No. Connection says you should generally share the Connection instance.
> > Ditto for BufferedMutator. It is backed by a buffer so you are batching
> > your writes when you use it. If you share the BM, your batching will more
> > 'regular'.
> >
> >
> > > - Should I keep connection per thread and instantiate mutator for each
> > > request?
> > >
> >
> > See above.
> >
> >
> > > - what happens if autoflush enabled?
> > >
> >
> > When the backing buffer is full, it gets flushed to the cluster.
> >
> >
> > > - is it possible to turn on sync mode for saving data?
> > >
> >
> > Yes. Don't use a BufferedMutator. Your throughput will go down as you do
> an
> > RPC per write.
> >
> >
> > > - Connection could get into failed/invalid state for some reason (RS
> > down,
> > > RS up, some networking partitioned happend). Is it possible?
> >
> >
> >
> > It could happen but I think by now we've seen nearly all of the ways in
> > which a Connection can fail and internally, it compensates.
> >
> >
> >
> > > If it's
> > > possible, then what is the right way to handle it: "close" failed
> > > connection and ask for new one?
> > >
> > >
> > Good question. Connection internally will retry and 'ride over' near all
> > cluster issues but the catastrophic.
> >
> > St.Ack
> >
> >
> >
> > > > I assume users is collection of User's.
> > > > Have you tried obtaining / closing mutator for each User instead of
> > > sharing
> > > > the mutator ?
> > >
> > > > If another flush, say because there were lots of puts, then when
> close
> > > > comes in, we are out of threads and you'd get below.
> > > Looks like it's the root cause, let me try!
> > >
> > > Thank you for detailed explanation!
> > >
> > >
> > >
> > > 2016-02-09 7:10 GMT+01:00 Stack <st...@duboce.net>:
> > >
> > > > On Mon, Feb 8, 2016 at 3:01 PM, Serega Sheypak <
> > serega.shey...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi, I'm confused with new HBase 1.0 API. API says that application
> > > should
> > > > > manage connections (Previously HConnections) on their own. Nothing
> is
> > > > > managed itnernally now.
> > > > >
> > > > >
> > > > That is right.
> > > >
> > > >
> > > >
> > > >
> > > > > Here is an example:
> > > > >
> > > > >
> > > >
> > >
> >
> https://hbase.apache.org/xref/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.html
> > > > >
> > > > > It gives no clue about lifecycle :(
> > > > >
> > > >
> > > >
> > > > Connection is fairly explicit:
> > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
> > > >
> > > > What should we add here to make the doc more clear on
> BufferedMutator?
> > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > >
> > > > And in the example?
> > > >
> > > >
> > > >
> > > > > Right now I create single connection instance for servlet and
> > > > > BufferedMutator per request.
> > > > >
> > > > >
> > > > >
> > > > The recommendation is a singled BufferedMutator shared across
> requests.
> > > >
> > > >
> > > >
> > > >
> > > > > //getConnection returns single instance, it doesn't return new
> > > connection
> > > > > each time
> > > > > def mutator =
> > > > > getConnection().getBufferedMutator(getBufferedMutatorParams())
> > > > >
> > > >
> > > >
> > > > getConnection is your method?
> > > >
> > > >
> > > > getBufferedMutator creates a new one?
> > > >
> > > >
> > > >
> > > > > users.each{ mutator.mutate(toPut(it))}
> > > > > mutator.close() //exception is thrown here
> > > > >
> > > > >
> > > > The close is flushing out all the writes.
> > > >
> > > > If a BufferedMutator per servlet instance, there are probably many
> when
> > > > many requests coming in.
> > > >
> > > > See what happens when you create one:
> > > >
> > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/ConnectionImplementation.html#313
> > > >
> > > > which calls through to here....
> > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/HTable.html#126
> > > >
> > > > ... which creates an executor of max 1 task only.
> > > >
> > > > If another flush, say because there were lots of puts, then when
> close
> > > > comes in, we are out of threads and you'd get below.
> > > >
> > > > St.Ack
> > > >
> > > >
> > > >
> > > >
> > > > >
> > > > > And I get tons of exceptions thrown on "mutator.close()", what do I
> > do
> > > > > wrong?
> > > > >
> > > > > WARNING: #905, the task was rejected by the pool. This is
> unexpected.
> > > > > Server is node04.server.com, 60020,1447338864601
> > > > > java.util.concurrent.RejectedExecutionException: Task
> > > > > java.util.concurrent.FutureTask@5cff3b40 rejected from
> > > > > java.util.concurrent.ThreadPoolExecutor@686c2853[Terminated, pool
> > > size =
> > > > > 0,
> > > > > active threads = 0, queued tasks = 0, completed tasks = 1]
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.sendMultiAction(AsyncProcess.java:956)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.access$000(AsyncProcess.java:574)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess.submitMultiActions(AsyncProcess.java:423)
> > > > > at
> > > > >
> > >
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:403)
> > > > > at
> > > > >
> > >
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)
> > > > > at
> > org.apache.hadoop.hbase.client.BufferedMutator$close$0.call(Unknown
> > > > > Source)
> > > > >
> > > >
> > >
> >
>

Reply via email to