[ 
https://issues.apache.org/jira/browse/HBASE-23062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhanxiongWang updated HBASE-23062:
----------------------------------
    Description: 
I did the experiment in two ways. One way I use spark to read hbase, the other 
I use mapreduce to read hbase. In both cases, when I increase the Scan Caching 
size, some data will be lost. To be more accurately, When I set 
scan.setCaching(500), I can receive 7622 rows of data, but when I set 
scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness 
of the problem is that the data is lost but there is no exceptions, it is 
difficult to find the reason.

My spark code is like this:
{code:java}
Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
final Scan hbaseScan = new Scan();
hbaseScan.addFamily(familyName);
hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for(String[] cell:cellNames){ 
String column = cell[0]; 
hbaseScan.addColumn(familyName,Bytes.toBytes(column));
}
hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
try { 
ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); 
hbaseConfiguration.set(TableInputFormat.SCAN, 
Base64.encodeBytes(scanProto.toByteArray())); 
JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = 
jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( 
hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, 
Result.class );
System.out.println("pairRDD.count(): " + pairRDD.count());
} 
catch (IOException e) { 
System.out.println("Scan Exception!!!!!! " + e.getMessage());
}
{code}
My mapreduce code is like this:
{code:java}
static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
   @Override protected void map(ImmutableBytesWritable key, Result 
value,Mapper.Context context) throws IOException, InterruptedException {
      for(Cell cell :value.rawCells()){ 
        context.write(new ImmutableBytesWritable("A".getBytes()),new 
Text("max")); 
      } 
   }
}


public static void main(String[] args) throws Exception { 
org.apache.hadoop.conf.Configuration hbaseConfiguration = 
HBaseConfiguration.create(); 
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); 
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); 
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
Job job = Job.getInstance(hbaseConfiguration); 
job.setJarByClass(App.class);
List<Scan> list = new ArrayList<Scan>(); 
Scan scan = new Scan(); 
scan.addFamily(Bytes.toBytes(familyName)); 
scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
for (String[] cell : cellNames) { 
  String column = cell[0]; 
  scan.addColumn(familyName,Bytes.toBytes(column)); 
} 
scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
scan.setStopRow(Bytes.toBytes(endRowkeyStr)); 
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(hbaseTableName)); 
list.add(scan);
System.out.println("size: "+list.size()); 
TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
 job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
job.setMapOutputValueClass(Text.class); 
job.setOutputKeyClass(ImmutableBytesWritable.class); 
job.setOutputValueClass(Text.class); 
FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); 
System.exit(job.waitForCompletion(true) ? 0 : 1);
}{code}
The pom.xml for mapreduce code is like this:

[^pom.xml]

 

  was:
I did the experiment in two ways. One way I use spark to read hbase, the other 
I use mapreduce to read hbase. In both cases, when I increase the Scan Caching 
size, some data will be lost. To be more accurately, When I set 
scan.setCaching(500), I can receive 7622 rows of data, but when I set 
scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness 
of the problem is that the data is lost but there is no exceptions, it is 
difficult to find the reason.

My spark code is like this:
{code:java}
Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
final Scan hbaseScan = new Scan();
hbaseScan.addFamily(familyName);
hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for(String[] cell:cellNames){ 
String column = cell[0]; 
hbaseScan.addColumn(familyName,Bytes.toBytes(column));
}
hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
try { 
ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); 
hbaseConfiguration.set(TableInputFormat.SCAN, 
Base64.encodeBytes(scanProto.toByteArray())); 
JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = 
jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( 
hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, 
Result.class );
System.out.println("pairRDD.count(): " + pairRDD.count());
} 
catch (IOException e) { 
System.out.println("Scan Exception!!!!!! " + e.getMessage());
}
{code}
My mapreduce code is like this:
{code:java}
static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
@Override protected void map(ImmutableBytesWritable key, Result 
value,Mapper.Context context) throws IOException, InterruptedException {
 for(Cell cell :value.rawCells()){ 
context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); } }} 

public static void main(String[] args) throws Exception { 
org.apache.hadoop.conf.Configuration hbaseConfiguration = 
HBaseConfiguration.create(); 
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); 
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster); 
hbaseConfiguration.set("zookeeper.znode.parent", zkPath); 
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); 
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
Job job = Job.getInstance(hbaseConfiguration); job.setJarByClass(App.class);
List<Scan> list = new ArrayList<Scan>(); Scan scan = new Scan(); 
scan.addFamily(Bytes.toBytes(familyName)); 
scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
for (String[] cell : cellNames) { 
String column = cell[0]; 
scan.addColumn(familyName,Bytes.toBytes(column)); 
} 
scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
scan.setStopRow(Bytes.toBytes(endRowkeyStr)); 
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(hbaseTableName)); list.add(scan);
System.out.println("size: "+list.size()); 
TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
 job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
job.setMapOutputValueClass(Text.class); 
job.setOutputKeyClass(ImmutableBytesWritable.class); 
job.setOutputValueClass(Text.class); 
FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); 
System.exit(job.waitForCompletion(true) ? 0 : 1);
}{code}
The pom.xml for mapreduce code is like this:

[^pom.xml]

 


> Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the 
> size is too big, some rowkeys will lost without exctpions.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-23062
>                 URL: https://issues.apache.org/jira/browse/HBASE-23062
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 0.98.6.1
>            Reporter: ZhanxiongWang
>            Priority: Major
>         Attachments: pom.xml
>
>
> I did the experiment in two ways. One way I use spark to read hbase, the 
> other I use mapreduce to read hbase. In both cases, when I increase the Scan 
> Caching size, some data will be lost. To be more accurately, When I set 
> scan.setCaching(500), I can receive 7622 rows of data, but when I set 
> scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness 
> of the problem is that the data is lost but there is no exceptions, it is 
> difficult to find the reason.
> My spark code is like this:
> {code:java}
> Configuration hbaseConfiguration = HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> final Scan hbaseScan = new Scan();
> hbaseScan.addFamily(familyName);
> hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for(String[] cell:cellNames){ 
> String column = cell[0]; 
> hbaseScan.addColumn(familyName,Bytes.toBytes(column));
> }
> hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
> hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
> try { 
> ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); 
> hbaseConfiguration.set(TableInputFormat.SCAN, 
> Base64.encodeBytes(scanProto.toByteArray())); 
> JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = 
> jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( 
> hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class, 
> Result.class );
> System.out.println("pairRDD.count(): " + pairRDD.count());
> } 
> catch (IOException e) { 
> System.out.println("Scan Exception!!!!!! " + e.getMessage());
> }
> {code}
> My mapreduce code is like this:
> {code:java}
> static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
>    @Override protected void map(ImmutableBytesWritable key, Result 
> value,Mapper.Context context) throws IOException, InterruptedException {
>       for(Cell cell :value.rawCells()){ 
>         context.write(new ImmutableBytesWritable("A".getBytes()),new 
> Text("max")); 
>       } 
>    }
> }
> public static void main(String[] args) throws Exception { 
> org.apache.hadoop.conf.Configuration hbaseConfiguration = 
> HBaseConfiguration.create(); 
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); 
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); 
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> Job job = Job.getInstance(hbaseConfiguration); 
> job.setJarByClass(App.class);
> List<Scan> list = new ArrayList<Scan>(); 
> Scan scan = new Scan(); 
> scan.addFamily(Bytes.toBytes(familyName)); 
> scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
> for (String[] cell : cellNames) { 
>   String column = cell[0]; 
>   scan.addColumn(familyName,Bytes.toBytes(column)); 
> } 
> scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
> scan.setStopRow(Bytes.toBytes(endRowkeyStr)); 
> scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
> Bytes.toBytes(hbaseTableName)); 
> list.add(scan);
> System.out.println("size: "+list.size()); 
> TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
>  job);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
> job.setMapOutputValueClass(Text.class); 
> job.setOutputKeyClass(ImmutableBytesWritable.class); 
> job.setOutputValueClass(Text.class); 
> FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); 
> System.exit(job.waitForCompletion(true) ? 0 : 1);
> }{code}
> The pom.xml for mapreduce code is like this:
> [^pom.xml]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to