[
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837198#comment-15837198
]
ASF GitHub Bot commented on FLINK-2168:
---------------------------------------
Github user ramkrish86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/3149#discussion_r97709518
--- Diff:
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
---
@@ -22,54 +22,63 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* Creates a table source that helps to scan data from an hbase table
*
* Note : the colNames are specified along with a familyName and they are
seperated by a ':'
* For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier
name
*/
-public class HBaseTableSource implements BatchTableSource<Row>,
ProjectableTableSource<Row> {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource<Row> {
private Configuration conf;
private String tableName;
- private byte[] rowKey;
- private String[] colNames;
- private TypeInformation<?>[] colTypes;
+ private HBaseTableSchema schema;
+ private String[] famNames;
- public HBaseTableSource(Configuration conf, String tableName, byte[]
rowKey, String[] colNames,
- TypeInformation<?>[]
colTypes) {
+ public HBaseTableSource(Configuration conf, String tableName,
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table
name");
- this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
- this.colNames = Preconditions.checkNotNull(colNames, "Field
names");
- this.colTypes = Preconditions.checkNotNull(colTypes, "Field
types");
+ this.schema = Preconditions.checkNotNull(schema, "Schema");
+ Map<String, List<Pair>> familyMap = schema.getFamilyMap();
+ famNames = familyMap.keySet().toArray(new
String[familyMap.size()]);
}
@Override
public TypeInformation<Row> getReturnType() {
- return new RowTypeInfo(colTypes);
- }
-
- @Override
- public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
- return execEnv.createInput(new
HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes),
getReturnType());
- }
+ // split the fieldNames
+ Map<String, List<Pair>> famMap = schema.getFamilyMap();
- @Override
- public ProjectableTableSource<Row> projectFields(int[] fields) {
- String[] newColNames = new String[fields.length];
- TypeInformation<?>[] newColTypes = new
TypeInformation<?>[fields.length];
+ List<String> qualNames = new ArrayList<String>();
--- End diff --
I will look into this.
> Add HBaseTableSource
> --------------------
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 0.9
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Minor
> Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}}
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)