Lars,

I found that at least the exceptions have nothing to do with shared HTable.

To save the resources, I designed a pool for the classes that write
and read from HBase. The primary resources consumed in the classes are
HTable. The pool has some bugs.

My question is whether it is necessary to design such a pool? Is it
fine to create a instance of HTable for each thread?

I noticed that HBase has a class, HTablePool. Maybe the pool I
designed is NOT required?

Thanks so much!

Best wishes!
Bing

On Wed, Feb 6, 2013 at 1:05 PM, lars hofhansl <la...@apache.org> wrote:
> Are you sharing this.rankTable between threads? HTable is not thread safe.
>
> -- Lars
>
>
>
> ________________________________
>  From: Bing Li <lbl...@gmail.com>
> To: "hbase-u...@hadoop.apache.org" <hbase-u...@hadoop.apache.org>; user 
> <user@hbase.apache.org>
> Sent: Tuesday, February 5, 2013 8:54 AM
> Subject: Re: Is "synchronized" required?
>
> Dear all,
>
> After "synchronized" is removed from the method of writing, I get the
> following exceptions when reading. Before the removal, no such
> exceptions.
>
> Could you help me how to solve it?
>
> Thanks so much!
>
> Best wishes,
> Bing
>
>      [java] Feb 6, 2013 12:21:31 AM
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection run
>      [java] WARNING: Unexpected exception receiving call responses
>      [java] java.lang.NullPointerException
>      [java]     at
> org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521)
>      [java]     at
> org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297)
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593)
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505)
>      [java] Feb 6, 2013 12:21:31 AM
> org.apache.hadoop.hbase.client.ScannerCallable close
>      [java] WARNING: Ignore, probably already closed
>      [java] java.io.IOException: Call to greatfreeweb/127.0.1.1:60020
> failed on local exception: java.io.IOException: Unexpected exception
> receiving call responses
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient.wrapException(HBaseClient.java:934)
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:903)
>      [java]     at
> org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:150)
>      [java]     at $Proxy6.close(Unknown Source)
>      [java]     at
> org.apache.hadoop.hbase.client.ScannerCallable.close(ScannerCallable.java:112)
>      [java]     at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:74)
>      [java]     at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:39)
>      [java]     at
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.getRegionServerWithRetries(HConnectionManager.java:1325)
>      [java]     at
> org.apache.hadoop.hbase.client.HTable$ClientScanner.nextScanner(HTable.java:1167)
>      [java]     at
> org.apache.hadoop.hbase.client.HTable$ClientScanner.next(HTable.java:1296)
>      [java]     at
> org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext(HTable.java:1356)
>      [java]     at
> com.greatfree.hbase.rank.NodeRankRetriever.LoadNodeGroupNodeRankRowKeys(NodeRankRetriever.java:348)
>      [java]     at
> com.greatfree.ranking.PersistNodeGroupNodeRanksThread.run(PersistNodeGroupNodeRanksThread.java:29)
>      [java]     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>      [java]     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>      [java]     at java.lang.Thread.run(Thread.java:662)
>      [java] Caused by: java.io.IOException: Unexpected exception
> receiving call responses
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:509)
>      [java] Caused by: java.lang.NullPointerException
>      [java]     at
> org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521)
>      [java]     at
> org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297)
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593)
>      [java]     at
> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505)
>
>
> The code that causes the exceptions is as follows.
>
>         public Set<String> LoadNodeGroupNodeRankRowKeys(String
> hostNodeKey, String groupKey, int timingScale)
>         {
>                 List<Filter> nodeGroupFilterList = new ArrayList<Filter>();
>
>                 SingleColumnValueFilter hostNodeKeyFilter = new
> SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
> RankStructure.NODE_GROUP_NODE_RANK_HOST_NODE_KEY_COLUMN,
> CompareFilter.CompareOp.EQUAL, new SubstringComparator(hostNodeKey));
>                 hostNodeKeyFilter.setFilterIfMissing(true);
>                 nodeGroupFilterList.add(hostNodeKeyFilter);
>
>                 SingleColumnValueFilter groupKeyFilter = new
> SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
> RankStructure.NODE_GROUP_NODE_RANK_GROUP_KEY_COLUMN,
> CompareFilter.CompareOp.EQUAL, new SubstringComparator(groupKey));
>                 groupKeyFilter.setFilterIfMissing(true);
>                 nodeGroupFilterList.add(groupKeyFilter);
>
>                 SingleColumnValueFilter timingScaleFilter = new
> SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
> RankStructure.NODE_GROUP_NODE_RANK_TIMING_SCALE_COLUMN,
> CompareFilter.CompareOp.EQUAL, new
> BinaryComparator(Bytes.toBytes(timingScale)));
>                 timingScaleFilter.setFilterIfMissing(true);
>                 nodeGroupFilterList.add(timingScaleFilter);
>
>                 FilterList nodeGroupFilter = new
> FilterList(nodeGroupFilterList);
>                 Scan scan = new Scan();
>                 scan.setFilter(nodeGroupFilter);
>                 scan.setCaching(Parameters.CACHING_SIZE);
>                 scan.setBatch(Parameters.BATCHING_SIZE);
>
>                 Set<String> rowKeySet = Sets.newHashSet();
>                 try
>                 {
>                         ResultScanner scanner = 
> this.rankTable.getScanner(scan);
>                         for (Result result : scanner)          //
> <---- EXCEPTIONS are raised at this line.
>                         {
>                                 for (KeyValue kv : result.raw())
>                                 {
>
> rowKeySet.add(Bytes.toString(kv.getRow()));
>                                         break;
>                                 }
>                         }
>                         scanner.close();
>                 }
>                 catch (IOException e)
>                 {
>                         e.printStackTrace();
>                 }
>                 return rowKeySet;
>         }
>
>
> On Tue, Feb 5, 2013 at 4:20 AM, Bing Li <lbl...@gmail.com> wrote:
>> Dear all,
>>
>> When writing data into HBase, sometimes I got exceptions. I guess they
>> might be caused by concurrent writings. But I am not sure.
>>
>> My question is whether it is necessary to put "synchronized" before
>> the writing methods? The following lines are the sample code.
>>
>> I think the directive, synchronized, must lower the performance of
>> writing. Sometimes concurrent writing is needed in my system.
>>
>> Thanks so much!
>>
>> Best wishes,
>> Bing
>>
>> public synchronized void AddDomainNodeRanks(String domainKey, int
>> timingScale, Map<String, Double> nodeRankMap)
>> {
>>       List<Put> puts = new ArrayList<Put>();
>>       Put domainKeyPut;
>>       Put timingScalePut;
>>       Put nodeKeyPut;
>>       Put rankPut;
>>
>>       byte[] domainNodeRankRowKey;
>>
>>       for (Map.Entry<String, Double> nodeRankEntry : nodeRankMap.entrySet())
>>       {
>>           domainNodeRankRowKey =
>> Bytes.toBytes(RankStructure.DOMAIN_NODE_RANK_ROW +
>> Tools.GetAHash(domainKey + timingScale + nodeRankEntry.getKey()));
>>
>>          domainKeyPut = new Put(domainNodeRankRowKey);
>>          domainKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
>> RankStructure.DOMAIN_NODE_RANK_DOMAIN_KEY_COLUMN,
>> Bytes.toBytes(domainKey));
>>          puts.add(domainKeyPut);
>>
>>          timingScalePut = new Put(domainNodeRankRowKey);
>>          timingScalePut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
>> RankStructure.DOMAIN_NODE_RANK_TIMING_SCALE_COLUMN,
>> Bytes.toBytes(timingScale));
>>         puts.add(timingScalePut);
>>
>>         nodeKeyPut = new Put(domainNodeRankRowKey);
>>         nodeKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
>> RankStructure.DOMAIN_NODE_RANK_NODE_KEY_COLUMN,
>> Bytes.toBytes(nodeRankEntry.getKey()));
>>         puts.add(nodeKeyPut);
>>
>>         rankPut = new Put(domainNodeRankRowKey);
>>         rankPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
>> RankStructure.DOMAIN_NODE_RANK_RANKS_COLUMN,
>> Bytes.toBytes(nodeRankEntry.getValue()));
>>         puts.add(rankPut);
>>      }
>>
>>      try
>>      {
>>          this.rankTable.put(puts);
>>      }
>>      catch (IOException e)
>>      {
>>          e.printStackTrace();
>>      }
>> }

Reply via email to