Dear All,

Requesting for help on the async thrift protocol for non-blocking streaming 
mode, would greatly appreciate any input on the issue outlined in the thread 
below. Putting a wait/delay defeats the purpose of using the async capability. 
Please help.

Thanks & Regards
Pankaj Misra

-----Original Message-----
From: Pankaj Misra
Sent: Monday, November 19, 2012 5:47 PM
To: user@hbase.apache.org
Subject: HBase NonBlocking and Async Thrift

Dear All,

I am currently using Hadoop 0.23.1 with HBase 0.94.1 in a pseudo-distributed 
mode. I am trying to use HBase Thrift API (not using Thrift2 yet) in a 
nonblocking and async mode to insert a bulk of records. I am sharing the set of 
steps for everyone's information and setting the context to my problem

Please find below the code that I am using for initializing the async client

TBinaryProtocol.Factory binProtoFactory=new TBinaryProtocol.Factory();

TAsyncClientManager clientManager=null;
TNonblockingSocket nonBlockingSocket=null;

try {
   clientManager=new TAsyncClientManager(); } catch (IOException e) {
  throw new RuntimeException(e);
}
try {
  nonBlockingSocket=new TNonblockingSocket(HOST_NAME,PORT_NUMBER);
} catch (IOException e) {
throw new RuntimeException(e);
}


And, I am initializing the client as shown below

Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, 
nonBlockingSocket);



I could see two ways of using the client, i.e. one client for all the records 
to be inserted or separate instance of client for every record. I thought since 
this is a non-blocking channel, it would make sense to initialize 1 client for 
all the requests, since all the requests would be streamed using a framed 
transport.


// 1 async client for all the requests
Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, 
nonBlockingSocket);

// mutate rows called in a loop to insert multiple records, using the same 
client client.mutateRow(table, ByteBuffer.wrap(key), 
mutations,mutationAttributes,new HBaseInsertAsyncHandler());


But soon I found that I was wrong as I got back the following error.

java.lang.IllegalStateException: Client is currently executing another method: 
org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient$mutateRow_call
    at org.apache.thrift.async.TAsyncClient.checkReady(TAsyncClient.java:78)
    at 
org.apache.hadoop.hbase.thrift.generated.Hbase$AsyncClient.mutateRow(Hbase.java:2714)
Reading through the following JIRA educated me a bit more on this

https://issues.apache.org/jira/browse/THRIFT-945

So, I changed my code to initialize the client per record to be inserted.


//called both these statements for every record to be inserted in a loop 
Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, 
nonBlockingSocket); client.mutateRow(table, ByteBuffer.wrap(key), 
mutations,mutationAttributes,new HBaseInsertAsyncHandler());

Even this failed with the following error
2012-11-19 17:25:15,275 WARN  [TAsyncClientManager#SelectorThread 9] 
async.TAsyncClientManager (TAsyncClientManager.java:startPendingMethods(177)) - 
Caught exception in TAsyncClientManager!
java.nio.channels.ClosedChannelException
    at 
java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:167)
    at java.nio.channels.SelectableChannel.register(SelectableChannel.java:254)
    at 
org.apache.thrift.transport.TNonblockingSocket.registerSelector(TNonblockingSocket.java:99)
    at org.apache.thrift.async.TAsyncMethodCall.start(TAsyncMethodCall.java:141)
    at 
org.apache.thrift.async.TAsyncClientManager$SelectThread.startPendingMethods(TAsyncClientManager.java:169)
    at 
org.apache.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:114)

So, it looked to me as if the channel registration could not happen in time for 
it to get initialized and since the records are getting inserted in a loop, it 
possibly needs a time window for initialization to get complete and insert the 
record. So I had to introduce a delay for every such insertion, which I do not 
prefer to do. The code changes for that are as shown below

Hbase.AsyncClient client=new Hbase.AsyncClient(binProtoFactory, clientManager, 
nonBlockingSocket); client.mutateRow(table, ByteBuffer.wrap(key), 
mutations,mutationAttributes,new HBaseInsertAsyncHandler()); synchronized 
(client.getProtocolFactory()) {
    client.getProtocolFactory().wait(20);
}
With the above change, I could see the records getting inserted into HBase 
using async thrift client, but I think this is not the right solution and will 
look for some guidance from the community to have a more consistent way to 
utilize the the async thrift capability without any specific wait or sleep 
times, as putting a wait call, kills the async advantage and introduces delays 
in the overall throughput. Looking forward for your help.

Thanks and Regards
Pankaj Misra

________________________________

Neustar VP and Impetus CEO to present on 'Innovative information services 
powered by Cloud and Big Data technologies'at Cloud Expo - Santa Clara, Nov 
6th. http://www.impetus.com/events#2.

Check out Impetus contribution to build Luminar - a new business unit at 
Entravision. http://lf1.me/MS/


NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

________________________________

Neustar VP and Impetus CEO to present on ‘Innovative information services 
powered by Cloud and Big Data technologies’at Cloud Expo - Santa Clara, Nov 
6th. http://www.impetus.com/events#2.

Check out Impetus contribution to build Luminar - a new business unit at 
Entravision. http://lf1.me/MS/


NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

Reply via email to