[
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837201#comment-15837201
]
ASF GitHub Bot commented on FLINK-2168:
---------------------------------------
Github user ramkrish86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/3149#discussion_r97709590
--- Diff:
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
---
@@ -19,99 +19,113 @@
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Date;
import java.util.List;
+import java.util.Map;
/**
* {@link InputFormat} subclass that wraps the access for HTables. Returns
the result as {@link Row}
*/
-public class HBaseTableSourceInputFormat extends RichInputFormat<Row,
TableInputSplit> implements ResultTypeQueryable<Row> {
+public class HBaseTableSourceInputFormat extends TableInputFormat<Row>
implements ResultTypeQueryable<Row> {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
- private TypeInformation[] fieldTypeInfos;
- private String[] fieldNames;
- private transient Table table;
- private transient Scan scan;
private transient Connection conn;
- private ResultScanner resultScanner = null;
-
- private byte[] lastRow;
- private int scannedRows;
- private boolean endReached = false;
- private org.apache.hadoop.conf.Configuration conf;
- private static final String COLON = ":";
+ private transient org.apache.hadoop.conf.Configuration conf;
+ private HBaseTableSchema schema;
- public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
- this.conf = conf;
+ public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
- this.fieldNames = fieldNames;
- this.fieldTypeInfos = fieldTypeInfos;
+ this.conf = conf;
+ this.schema = schema;
}
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
- scan = createScanner();
+ scan = getScanner();
}
}
- private Scan createScanner() {
+ @Override
+ protected Scan getScanner() {
+ // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
- for(String field : fieldNames) {
+ Map<String, List<Pair>> familyMap = schema.getFamilyMap();
+ for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
- String[] famCol = field.split(COLON);
- scan.addColumn(Bytes.toBytes(famCol[0]),
Bytes.toBytes(famCol[1]));
+ List<Pair> colDetails = familyMap.get(family);
+ for(Pair<String, TypeInformation<?>> pair : colDetails)
{
+ scan.addColumn(Bytes.toBytes(family),
Bytes.toBytes(pair.getFirst()));
+ }
}
return scan;
}
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ protected Row mapResultToTuple(Result res) {
+ List<Object> values = new ArrayList<Object>();
+ int i = 0;
+ Map<String, List<Pair>> familyMap = schema.getFamilyMap();
+ Row[] rows = new Row[familyMap.size()];
--- End diff --
Will check this too. Thanks for all the nice comments @wuchong .
> Add HBaseTableSource
> --------------------
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 0.9
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Minor
> Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}}
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)