[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15257959#comment-15257959
]
ASF GitHub Bot commented on APEXMALHAR-1957:
--------------------------------------------
Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/212#discussion_r61073466
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +69,103 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
pojoType.newInstance(); //try create new instance to verify the
class.
rowSetter = PojoUtils.createSetter(pojoType,
tableInfo.getRowOrIdExpression(), String.class);
- fieldValueGenerator =
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo()
);
+ fieldValueGenerator =
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType,
tableInfo.getFieldsInfo() );
valueConverter = new BytesValueConverter();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void teardown()
+ protected Object getTuple(Result result)
{
try {
- store.disconnect();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void emitTuples()
- {
- try {
- Scan scan = nextScan();
- if (scan == null)
- return;
-
- ResultScanner resultScanner = store.getTable().getScanner(scan);
-
- while (true) {
- Result result = resultScanner.next();
- if (result == null)
- break;
-
- String readRow = Bytes.toString(result.getRow());
- if( readRow.equals( lastReadRow ))
- continue;
-
- Object instance = pojoType.newInstance();
- rowSetter.set(instance, readRow);
-
- List<Cell> cells = result.listCells();
+ String readRow = Bytes.toString(result.getRow());
+ if( readRow.equals( getLastReadRow() )) {
+ return null;
+ }
- for (Cell cell : cells) {
- String columnName =
Bytes.toString(CellUtil.cloneQualifier(cell));
- byte[] value = CellUtil.cloneValue(cell);
- fieldValueGenerator.setColumnValue( instance, columnName, value,
valueConverter );
- }
+ Object instance = pojoType.newInstance();
+ rowSetter.set(instance, readRow);
- outputPort.emit(instance);
- lastReadRow = readRow;
+ List<Cell> cells = result.listCells();
+ for (Cell cell : cells) {
+ String columnName = Bytes.toString(CellUtil.cloneQualifier(cell));
+ String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell));
+ byte[] value = CellUtil.cloneValue(cell);
+
((HBaseFieldValueGenerator)fieldValueGenerator).setColumnValue(instance,
columnName, columnFamily, value,
+ valueConverter);
}
+ setLastReadRow(readRow);
+ return instance;
} catch (Exception e) {
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
-
- }
-
- protected Scan nextScan()
- {
- if(lastReadRow==null && startRow==null )
- return new Scan();
- else
- return new Scan( Bytes.toBytes( lastReadRow == null ? startRow :
lastReadRow ) );
}
- public HBaseStore getStore()
- {
- return store;
- }
- public void setStore(HBaseStore store)
+ @Override
+ protected Scan operationScan()
{
- this.store = store;
+ Scan scan;
+ if (getLastReadRow() == null && getStartRow() == null) {
+ // If no start row specified and no row read yet
+ scan = new Scan();
--- End diff --
new scan() can be moved to setup()
> Improve HBasePOJOInputOperator with support for threaded read
> -------------------------------------------------------------
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Bhupesh Chawda
> Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)