flink sql cli 读取 hbase表报错

2020-06-28 文章
您好:

 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表

CREATE TABLE dim_term (
term_id string,
info ROW(
term_name string,
term_name_combine string,
term_notice string,
term_remarks string,
season string,
term_sequence string,
term_start_time string,
term_end_time string,
term_description string,
term_status int,
is_mvp_term int,
ctime string,
utime string
)

) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = 'dim_term',
'connector.zookeeper.quorum' = 
'emr-header-1.cluster-109533:2181,emr-worker-1.cluster-109533:2181,emr-header-2.cluster-109533:2181',
'connector.zookeeper.znode.parent' = '/hbase'
)

遇到的问题是,当我在sql-client ,执行select * from dim_term 的时候报错

2020-06-29 11:26:51,718 INFO  org.apache.flink.addons.hbase.HBaseRowInputFormat 

org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:65)
 - Initializing HBase configuration.
2020-06-29 11:26:51,831 INFO  
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.(RecoverableZooKeeper.java:120)
 - Process identifier=hconnection-0x57b9485d connecting to ZooKeeper 
ensemble=localhost:2181


org.apache.flink.addons.hbase.HBaseRowInputFormat,这个类里面没有读取到zookeeper的配置
 

Flink 查询hive表 初始化 Savepoint

2020-05-06 文章
我使用的是flink 1.10 ,想通过查询hive表的数据初始化Savepoint,现在遇到的问题是无法将Table转成DataSet

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
Table table = tableEnv.sqlQuery("select * from test001");

BootstrapTransformation broadcastTransformation = 
OperatorTransformation
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction());

Savepoint
.create(backend, 128)
.withOperator(ACCOUNT_UID, transformation)
.withOperator(CURRENCY_UID, broadcastTransformation)
.write(savepointPath);