A flush every time you add to BufferedMutator? That will result in an rpc
each time. Is that what you want? You don't want to let the BufferedMutator
backing buffer fill and auto flush?

Be sure to call close when shutting down else whatever is in the
BufferedMutator backing buffer will be lost.

St.Ack

On Wed, Feb 10, 2016 at 12:33 AM, Serega Sheypak <serega.shey...@gmail.com>
wrote:

> It helped, thanks.
> Now I have single reusable BufferedMutator instance and I don't call
> .close() after mutations, I call .flush() Is it ok?
>
> 2016-02-09 23:09 GMT+01:00 Stack <st...@duboce.net>:
>
> > On Tue, Feb 9, 2016 at 9:46 AM, Serega Sheypak <serega.shey...@gmail.com
> >
> > wrote:
> >
> > > I've modified my code:
> > >
> > > void saveUsers(Collection<User> users){
> > >         if(users && !users.isEmpty()){
> > >
> > >             // Get Connection instance, instance created once. , BM is
> > new
> > > for each request.'
> > >
> >
> >
> > If new for each request, don't bother using BM. Otherwise, put the BM in
> > same place you keep your Connection and just be sure to call close on BM
> > when your app goes down.
> > St.Ack
> >
> >
> >
> >
> >
> >
> > >             def mutator =
> > > getConnection().getBufferedMutator(getBufferedMutatorParams())
> > >
> > >              List<Put> putList = users.collect{toPut(it)}
> > >             mutator.mutate(putList)
> > >             *mutator.close() // exception here*
> > >         }
> > >     }
> > >
> > > Exception is still thrown
> > >
> > > 2016-02-09 15:43 GMT+01:00 Stack <st...@duboce.net>:
> > >
> > > > On Tue, Feb 9, 2016 at 6:37 AM, Serega Sheypak <
> > serega.shey...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi, thanks for reply!
> > > > >
> > > > >
> > > > > > What should we add here to make the doc more clear on
> > > BufferedMutator?
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > > > It's pretty clear.
> > > > >
> > > > > > And in the example?
> > > > > Example shows that both connection and buffered mutator are thread
> > safe
> > > > and
> > > > > they are closed at once.
> > > > > - Should keep single instance of connection and bufferedmutator per
> > > > thread?
> > > > >
> > > >
> > > > No. Connection says you should generally share the Connection
> instance.
> > > > Ditto for BufferedMutator. It is backed by a buffer so you are
> batching
> > > > your writes when you use it. If you share the BM, your batching will
> > more
> > > > 'regular'.
> > > >
> > > >
> > > > > - Should I keep connection per thread and instantiate mutator for
> > each
> > > > > request?
> > > > >
> > > >
> > > > See above.
> > > >
> > > >
> > > > > - what happens if autoflush enabled?
> > > > >
> > > >
> > > > When the backing buffer is full, it gets flushed to the cluster.
> > > >
> > > >
> > > > > - is it possible to turn on sync mode for saving data?
> > > > >
> > > >
> > > > Yes. Don't use a BufferedMutator. Your throughput will go down as you
> > do
> > > an
> > > > RPC per write.
> > > >
> > > >
> > > > > - Connection could get into failed/invalid state for some reason
> (RS
> > > > down,
> > > > > RS up, some networking partitioned happend). Is it possible?
> > > >
> > > >
> > > >
> > > > It could happen but I think by now we've seen nearly all of the ways
> in
> > > > which a Connection can fail and internally, it compensates.
> > > >
> > > >
> > > >
> > > > > If it's
> > > > > possible, then what is the right way to handle it: "close" failed
> > > > > connection and ask for new one?
> > > > >
> > > > >
> > > > Good question. Connection internally will retry and 'ride over' near
> > all
> > > > cluster issues but the catastrophic.
> > > >
> > > > St.Ack
> > > >
> > > >
> > > >
> > > > > > I assume users is collection of User's.
> > > > > > Have you tried obtaining / closing mutator for each User instead
> of
> > > > > sharing
> > > > > > the mutator ?
> > > > >
> > > > > > If another flush, say because there were lots of puts, then when
> > > close
> > > > > > comes in, we are out of threads and you'd get below.
> > > > > Looks like it's the root cause, let me try!
> > > > >
> > > > > Thank you for detailed explanation!
> > > > >
> > > > >
> > > > >
> > > > > 2016-02-09 7:10 GMT+01:00 Stack <st...@duboce.net>:
> > > > >
> > > > > > On Mon, Feb 8, 2016 at 3:01 PM, Serega Sheypak <
> > > > serega.shey...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi, I'm confused with new HBase 1.0 API. API says that
> > application
> > > > > should
> > > > > > > manage connections (Previously HConnections) on their own.
> > Nothing
> > > is
> > > > > > > managed itnernally now.
> > > > > > >
> > > > > > >
> > > > > > That is right.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > > Here is an example:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://hbase.apache.org/xref/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.html
> > > > > > >
> > > > > > > It gives no clue about lifecycle :(
> > > > > > >
> > > > > >
> > > > > >
> > > > > > Connection is fairly explicit:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html
> > > > > >
> > > > > > What should we add here to make the doc more clear on
> > > BufferedMutator?
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/BufferedMutator.html
> > > > > >
> > > > > > And in the example?
> > > > > >
> > > > > >
> > > > > >
> > > > > > > Right now I create single connection instance for servlet and
> > > > > > > BufferedMutator per request.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > The recommendation is a singled BufferedMutator shared across
> > > requests.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > > //getConnection returns single instance, it doesn't return new
> > > > > connection
> > > > > > > each time
> > > > > > > def mutator =
> > > > > > > getConnection().getBufferedMutator(getBufferedMutatorParams())
> > > > > > >
> > > > > >
> > > > > >
> > > > > > getConnection is your method?
> > > > > >
> > > > > >
> > > > > > getBufferedMutator creates a new one?
> > > > > >
> > > > > >
> > > > > >
> > > > > > > users.each{ mutator.mutate(toPut(it))}
> > > > > > > mutator.close() //exception is thrown here
> > > > > > >
> > > > > > >
> > > > > > The close is flushing out all the writes.
> > > > > >
> > > > > > If a BufferedMutator per servlet instance, there are probably
> many
> > > when
> > > > > > many requests coming in.
> > > > > >
> > > > > > See what happens when you create one:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/ConnectionImplementation.html#313
> > > > > >
> > > > > > which calls through to here....
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://hbase.apache.org/xref/org/apache/hadoop/hbase/client/HTable.html#126
> > > > > >
> > > > > > ... which creates an executor of max 1 task only.
> > > > > >
> > > > > > If another flush, say because there were lots of puts, then when
> > > close
> > > > > > comes in, we are out of threads and you'd get below.
> > > > > >
> > > > > > St.Ack
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > And I get tons of exceptions thrown on "mutator.close()", what
> > do I
> > > > do
> > > > > > > wrong?
> > > > > > >
> > > > > > > WARNING: #905, the task was rejected by the pool. This is
> > > unexpected.
> > > > > > > Server is node04.server.com, 60020,1447338864601
> > > > > > > java.util.concurrent.RejectedExecutionException: Task
> > > > > > > java.util.concurrent.FutureTask@5cff3b40 rejected from
> > > > > > > java.util.concurrent.ThreadPoolExecutor@686c2853[Terminated,
> > pool
> > > > > size =
> > > > > > > 0,
> > > > > > > active threads = 0, queued tasks = 0, completed tasks = 1]
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.sendMultiAction(AsyncProcess.java:956)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.access$000(AsyncProcess.java:574)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.AsyncProcess.submitMultiActions(AsyncProcess.java:423)
> > > > > > > at
> > > > > > >
> > > > >
> > >
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:403)
> > > > > > > at
> > > > > > >
> > > > >
> > >
> org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:320)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:158)
> > > > > > > at
> > > > org.apache.hadoop.hbase.client.BufferedMutator$close$0.call(Unknown
> > > > > > > Source)
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to