[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15345684#comment-15345684
 ] 

ASF GitHub Bot commented on APEXMALHAR-1957:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/292#discussion_r68172645
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
    @@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
       public void setup(OperatorContext context)
       {
         try {
    -      store.connect();
    +      super.setup(context);
           pojoType = Class.forName(pojoTypeName);
           pojoType.newInstance();   //try create new instance to verify the 
class.
           rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
    -      fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
    +      fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
           valueConverter = new BytesValueConverter();
    +      scan = new Scan();
         } catch (Exception ex) {
           throw new RuntimeException(ex);
         }
       }
     
       @Override
    -  public void beginWindow(long windowId)
    -  {
    -  }
    -
    -  @Override
    -  public void teardown()
    -  {
    -    try {
    -      store.disconnect();
    -    } catch (IOException ex) {
    -      throw new RuntimeException(ex);
    -    }
    -  }
    -
    -  @Override
    -  public void emitTuples()
    +  protected Object getTuple(Result result)
       {
         try {
    -      Scan scan = nextScan();
    -      if (scan == null)
    -        return;
    -
    -      ResultScanner resultScanner = store.getTable().getScanner(scan);
    -
    -      while (true) {
    -        Result result = resultScanner.next();
    -        if (result == null)
    -          break;
    -
    -        String readRow = Bytes.toString(result.getRow());
    -        if( readRow.equals( lastReadRow ))
    -          continue;
    -
    -        Object instance = pojoType.newInstance();
    -        rowSetter.set(instance, readRow);
    -
    -        List<Cell> cells = result.listCells();
    +      String readRow = Bytes.toString(result.getRow());
    +      if( readRow.equals( getLastReadRow() )) {
    --- End diff --
    
    This will need changes to the signature of getTuple. Here we are assuming 
that getTuple will extract the Hbase row and then convert it to the output 
type. However, the lastReadRow is stored in the raw form. In order to move this 
to base class, we'll have to map the tuple to a raw type and then check if the 
record is the one we saw last.


> Improve HBasePOJOInputOperator with support for threaded read
> -------------------------------------------------------------
>
>                 Key: APEXMALHAR-1957
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Bhupesh Chawda
>            Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for 
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to