[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15257959#comment-15257959
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:
--------------------------------------------

Github user sandeepdeshmukh commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/212#discussion_r61073466
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
    @@ -69,116 +69,103 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
       public void setup(OperatorContext context)
       {
         try {
    -      store.connect();
    +      super.setup(context);
           pojoType = Class.forName(pojoTypeName);
           pojoType.newInstance();   //try create new instance to verify the 
class.
           rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
    -      fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
    +      fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
           valueConverter = new BytesValueConverter();
         } catch (Exception ex) {
           throw new RuntimeException(ex);
         }
       }
     
       @Override
    -  public void beginWindow(long windowId)
    -  {
    -  }
    -
    -  @Override
    -  public void teardown()
    +  protected Object getTuple(Result result)
       {
         try {
    -      store.disconnect();
    -    } catch (IOException ex) {
    -      throw new RuntimeException(ex);
    -    }
    -  }
    -
    -  @Override
    -  public void emitTuples()
    -  {
    -    try {
    -      Scan scan = nextScan();
    -      if (scan == null)
    -        return;
    -
    -      ResultScanner resultScanner = store.getTable().getScanner(scan);
    -
    -      while (true) {
    -        Result result = resultScanner.next();
    -        if (result == null)
    -          break;
    -
    -        String readRow = Bytes.toString(result.getRow());
    -        if( readRow.equals( lastReadRow ))
    -          continue;
    -
    -        Object instance = pojoType.newInstance();
    -        rowSetter.set(instance, readRow);
    -
    -        List<Cell> cells = result.listCells();
    +      String readRow = Bytes.toString(result.getRow());
    +      if( readRow.equals( getLastReadRow() )) {
    +        return null;
    +      }
     
    -        for (Cell cell : cells) {
    -          String columnName = 
Bytes.toString(CellUtil.cloneQualifier(cell));
    -          byte[] value = CellUtil.cloneValue(cell);
    -          fieldValueGenerator.setColumnValue( instance, columnName, value, 
valueConverter );
    -        }
    +      Object instance = pojoType.newInstance();
    +      rowSetter.set(instance, readRow);
     
    -        outputPort.emit(instance);
    -        lastReadRow = readRow;
    +       List<Cell> cells = result.listCells();
    +       for (Cell cell : cells) {
    +         String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
    +         String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
    +        byte[] value = CellUtil.cloneValue(cell);
    +         
((HBaseFieldValueGenerator)fieldValueGenerator).setColumnValue(instance, 
columnName, columnFamily, value,
    +             valueConverter);
           }
     
    +      setLastReadRow(readRow);
    +      return instance;
         } catch (Exception e) {
    -      throw new RuntimeException(e.getMessage());
    +      throw new RuntimeException(e);
         }
    -
    -  }
    -
    -  protected Scan nextScan()
    -  {
    -    if(lastReadRow==null && startRow==null )
    -      return new Scan();
    -    else
    -      return new Scan( Bytes.toBytes( lastReadRow == null ? startRow : 
lastReadRow ) );
       }
     
    -  public HBaseStore getStore()
    -  {
    -    return store;
    -  }
    -  public void setStore(HBaseStore store)
    +  @Override
    +  protected Scan operationScan()
       {
    -    this.store = store;
    +    Scan scan;
    +    if (getLastReadRow() == null && getStartRow() == null) {
    +      // If no start row specified and no row read yet
    +      scan = new Scan();
    --- End diff --
    
    new scan() can be moved to setup()


> Improve HBasePOJOInputOperator with support for threaded read
> -------------------------------------------------------------
>
>                 Key: APEXMALHAR-1957
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Bhupesh Chawda
>            Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to