[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15257975#comment-15257975
]
ASF GitHub Bot commented on APEXMALHAR-1957:
--------------------------------------------
Github user sandeepdeshmukh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/212#discussion_r61074763
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---
@@ -40,27 +51,106 @@
* @tags hbase, scan, input operator
* @since 0.3.2
*/
-public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
+public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T>
implements Operator.ActivationListener<Context>
{
+ public static final int DEF_HINT_SCAN_LOOKAHEAD = 2;
+ public static final int DEF_QUEUE_SIZE = 1000;
+ public static final int DEF_SLEEP_MILLIS = 10;
+
+ private String startRow;
+ private String endRow;
+ private String lastReadRow;
+ private int hintScanLookahead = DEF_HINT_SCAN_LOOKAHEAD;
+ private int queueSize = DEF_QUEUE_SIZE;
+ private int sleepMillis = DEF_SLEEP_MILLIS;
+ private Queue<Result> resultQueue;
+
+ @AutoMetric
+ protected long tuplesRead;
+
+ // Transients
+ protected transient Scan scan;
+ protected transient ResultScanner scanner;
+ protected transient Thread readThread;
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ resultQueue = Queues.newLinkedBlockingQueue(queueSize);
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ startReadThread();
+ }
+
+ protected void startReadThread()
+ {
+ try {
+ scan = operationScan();
+ scanner = getStore().getTable().getScanner(scan);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ readThread = new Thread(new Runnable() {
+ @Override
+ public void run()
+ {
+ try {
+ Result result;
+ while ((result = scanner.next()) != null) {
+ while (!resultQueue.offer(result)) {
+ Thread.sleep(sleepMillis);
+ }
+ }
+ } catch (Exception e) {
+ logger.debug("Exception in fetching results {}", e.getMessage());
+ throw new RuntimeException(e);
+ } finally {
+ scanner.close();
+ }
+ }
+ });
+ readThread.start();
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ tuplesRead = 0;
+ }
@Override
public void emitTuples()
{
+ if (!readThread.isAlive() && resultQueue.isEmpty()) {
+ startReadThread();
+ }
try {
- HTable table = getTable();
- Scan scan = operationScan();
- ResultScanner scanner = table.getScanner(scan);
- for (Result result : scanner) {
- //KeyValue[] kvs = result.raw();
- //T t = getTuple(kvs);
- T t = getTuple(result);
- outputPort.emit(t);
+ Result result = resultQueue.poll();
+ if (result == null) {
+ Thread.sleep(sleepMillis);
--- End diff --
Need not sleep here as the engine will handle it if there is no tuple
emitted during the call of emitTuples.
> Improve HBasePOJOInputOperator with support for threaded read
> -------------------------------------------------------------
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Bhupesh Chawda
> Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)