[ https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837092#comment-15837092 ]
ASF GitHub Bot commented on FLINK-2168: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3149#discussion_r97701594 --- Diff: flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java --- @@ -19,99 +19,113 @@ package org.apache.flink.addons.hbase; import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.LocatableInputSplitAssigner; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.types.Row; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.util.ArrayList; -import java.util.Date; import java.util.List; +import java.util.Map; /** * {@link InputFormat} subclass that wraps the access for HTables. Returns the result as {@link Row} */ -public class HBaseTableSourceInputFormat extends RichInputFormat<Row, TableInputSplit> implements ResultTypeQueryable<Row> { +public class HBaseTableSourceInputFormat extends TableInputFormat<Row> implements ResultTypeQueryable<Row> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseTableSourceInputFormat.class); private String tableName; - private TypeInformation[] fieldTypeInfos; - private String[] fieldNames; - private transient Table table; - private transient Scan scan; private transient Connection conn; - private ResultScanner resultScanner = null; - - private byte[] lastRow; - private int scannedRows; - private boolean endReached = false; - private org.apache.hadoop.conf.Configuration conf; - private static final String COLON = ":"; + private transient org.apache.hadoop.conf.Configuration conf; + private HBaseTableSchema schema; - public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) { - this.conf = conf; + public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) { this.tableName = tableName; - this.fieldNames = fieldNames; - this.fieldTypeInfos = fieldTypeInfos; + this.conf = conf; + this.schema = schema; } @Override public void configure(Configuration parameters) { LOG.info("Initializing HBaseConfiguration"); connectToTable(); if(table != null) { - scan = createScanner(); + scan = getScanner(); } } - private Scan createScanner() { + @Override + protected Scan getScanner() { + // TODO : Pass 'rowkey'. For this we need FilterableTableSource Scan scan = new Scan(); - for(String field : fieldNames) { + Map<String, List<Pair>> familyMap = schema.getFamilyMap(); + for(String family : familyMap.keySet()) { // select only the fields in the 'selectedFields' - String[] famCol = field.split(COLON); - scan.addColumn(Bytes.toBytes(famCol[0]), Bytes.toBytes(famCol[1])); + List<Pair> colDetails = familyMap.get(family); + for(Pair<String, TypeInformation<?>> pair : colDetails) { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst())); + } } return scan; } + @Override + public String getTableName() { + return tableName; + } + + @Override + protected Row mapResultToTuple(Result res) { + List<Object> values = new ArrayList<Object>(); + int i = 0; + Map<String, List<Pair>> familyMap = schema.getFamilyMap(); + Row[] rows = new Row[familyMap.size()]; --- End diff -- Better to declare `rows` as `Object[]` to avoid confusing whether `rows` is a varargs or non-varargs. > Add HBaseTableSource > -------------------- > > Key: FLINK-2168 > URL: https://issues.apache.org/jira/browse/FLINK-2168 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 0.9 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Minor > Labels: starter > > Add a {{HBaseTableSource}} to read data from a HBase table. The > {{HBaseTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. > The implementation can be based on Flink's {{TableInputFormat}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)