Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询
贴一下代码 在 2020/9/8 14:09, zhongbaoluo 写道: 据插入数据执行失败,也没有找到异常。 yarn
Re: flink sql 1.11.1 could not insert hive orc record
看看你的表是不是事务表,hive建表的时候加上 'transactional' = 'false' 在 2020/9/8 16:26, 大罗 写道: Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据: 我所使用的版本如下: Hadoop 3.0.0+cdh6.3.2 HDFS 3.0.0+cdh6.3.2 HBase 2.1.0+cdh6.3.2 Hive 2.1.1+cdh6.3.2 Flink 1.11.1 定义hive orc表如下: create table dest_orc ( i int ) partitioned by (ts string) stored as orc TBLPROPERTIES( 'orc.compress' = 'SNAPPY' ); 在flink-sql插入数据: Flink SQL> insert into dest_orc select 1, '2020-09-08 10:11:00' ; [INFO] Table update statement has been successfully submitted to the cluster: Job ID: a2c96bcaf23abc24de8e5405ec2bb7c6 报错如下: 2020-09-08 16:16:39 org.apache.flink.connectors.hive.FlinkHiveException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159) at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$183.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) ... 25 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) ... 26 more Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hive.ql.io.orc.OrcInputFormat at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:161) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:189) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:67) at
Re: 消费kafka数据乱序问题
业务端根据用户ID hash发送到kafka,保证每一个用户的所有操作在kafka的同一个partition内,并且在发送端保证操作有序。 至于flink消费kafka后,乱序,不太可能,或者说可能性极小,毕竟都是按照offset来消费。 在 2020/9/4 18:59, Xiao Xu 写道: 两个方法 1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的 2. 就是你说的在 flink 里面做乱序处理 宁吉浩 于2020年9月4日周五 下午5:56写道: 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; 我的解决办法是把迟到数据丢弃,然后进行业务计算; 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; -- 发件人:smq <374060...@qq.com> 发送时间:2020年9月4日(星期五) 17:35 收件人:wwj ; user-zh 主 题:回复:消费kafka数据乱序问题 换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 ---原始邮件--- 发件人: "wwj"
Re: 回复:请指教一个关于时间窗的问题,非常感谢!
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: 补充一下环境信息: 有点类似以下问题: 在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 不确定是否是因为kafka多分区引起的? 发件人: samuel@ubtrobot.com 发送时间: 2020-09-03 09:23 收件人: user-zh 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! 谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co .BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[]
Re: FlinkKafkaConsumer问题
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 在 2020/9/4 10:34, Shuiqiang Chen 写道: Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。 在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道: 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? --原始邮件-- 发件人: "user-zh"
Re: 请指教一个关于时间窗的问题,非常感谢!
org.apache.flink.streaming.api.windowing.windows.TimeWindow getWindowStartWithOffset??17-182020-09-01 18:00:00.0 2020-09-01 18:00:00.0 ??2020-09-01 18:00:00.001?? ??wartermarker5??2020-09-01 18:00:05.001 ?? ?? 2020/9/2 15:20, samuel@ubtrobot.com : flink SQL,tumble window //eventtime??watermark DataStream> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')";// Table table = tenv.sqlQuery(sql); DataStream dataStream = tenv.toAppendStream(table, Result.class); ?? (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 2020-09-01 18:00:00.0?? (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
Re: 请问一下,flink 1.11 的cdc历史数据问题
应该是。通过源码可以知道flink-json目前支持2种内置json格式的解析,一个是canal,一个是debezium。 具体可参考: org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema 和 org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema 在 2020/8/24 17:27, dixingxin...@163.com 写道: Hi all: Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下: 1.底层是使用了debezium来加载历史数据的吗? 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力? 希望能帮忙解答一下,谢谢。 Best, Xingxing Di
Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据
hive3.0默认就是事务表,建表语句加上 TBLPROPERTIES('transactional'='false') 在 2020/8/24 15:43, 黄蓉 写道: 感谢各位: 我已经找到问题的原因了,是因为HDP3.0.1中的Hive3.1.0默认开启了事务,而Flink 1.11.0写入和读取hive表应该是暂时不支持事务的。所以两者不兼容。我把Hive中事务相关的设置都关闭之后就正常了。 Jessie jessie...@gmail.com -- Original Message -- From: "taochanglian" To: user-zh@flink.apache.org Sent: 8/24/2020 5:28:56 AM Subject: Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据 https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么? 在 2020/8/24 3:01, 黄蓉 写道: 各位好: 我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink sql client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show tables,show database等语句都可以正常显示。 配置的hadoop环境变量如下: export HADOOP_CONF_DIR="/etc/hadoop/conf" export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop" export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*" sql-client配置文件如下: tables: [] functions: [] catalogs: - name: myhive type: hive hive-conf-dir: /opt/hive-conf execution: planner: blink type: batch result-mode: table max-table-result-rows: 100 parallelism: 3 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: myhive current-database: default restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? Jessie jessie...@gmail.com
Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么? 在 2020/8/24 3:01, 黄蓉 写道: 各位好: 我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink sql client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show tables,show database等语句都可以正常显示。 配置的hadoop环境变量如下: export HADOOP_CONF_DIR="/etc/hadoop/conf" export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop" export HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*" sql-client配置文件如下: tables: [] functions: [] catalogs: - name: myhive type: hive hive-conf-dir: /opt/hive-conf execution: planner: blink type: batch result-mode: table max-table-result-rows: 100 parallelism: 3 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: myhive current-database: default restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? Jessie jessie...@gmail.com
Re: flink sql 数据异常导致任务失败
你kafka里面的是json么?format是json么? String resultCreateTableSql = createKafkaSourceSQL +" WITH ( " +" 'connector' = 'kafka' ," +" 'topic' = '" + kafkaTopic +"'," +" 'properties.bootstrap.servers' = '" + kafkaBootstrapServers +"'," +" 'properties.group.id' = '" + kafkaGroupId +"'," +" 'format' = '" + kafkaFormat +"'," +" 'scan.startup.mode' = '" + scanStartUpMode +"'," +" 'json.fail-on-missing-field' = 'false'," +" 'json.ignore-parse-errors' = 'true' )"; json.fail-on-missing-field json.ignore-parse-errors 这两个参数你加了么?加了没用? 在 2020/8/18 14:34, 赵一旦 写道: 我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。 shizk233 于2020年8月18日周二 下午2:26写道: 考虑修改一下json解析的逻辑来处理异常数据? 赵一旦 于2020年8月18日周二 上午11:59写道: 有没有小伙伴生产中有类似问题呢,都怎么解决的呢? 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream api,然后捕获所有异常即可。 赵一旦 于2020年8月17日周一 下午7:15写道: kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢? 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。
Re: Print SQL connector无法正常使用
我这个是mysql的,直接run,没问题 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String createMysqlTableSQL ="CREATE TABLE mysqlTable1 (\n" + " id INT,\n" + " username STRING,\n" + " age INT , \n" + " PRIMARY KEY (id) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'jdbc' ,\n" + " 'url' = 'jdbc:mysql://1.2.3.4:3306/bigdata',\n" + " 'table-name' = 'test' ,\n" + " 'username' = 'root' ,\n" + " 'password' = 'root' \n" + ")"; String print_table=" CREATE TABLE print_table (" + " id INT," + " username STRING," + " age INT" + " ) WITH (" + " 'connector' = 'print'" + " )"; tableEnv.executeSql(createMysqlTableSQL); tableEnv.executeSql(print_table); String querysql ="insert into print_table select * from mysqlTable1"; tableEnv.executeSql(querysql); 在 2020/8/18 11:57, xiao cai 写道: Hi china_tao: 你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢 原始邮件 发件人: china_tao 收件人: user-zh 发送时间: 2020年8月17日(周一) 23:00 主题: Re: Print SQL connector无法正常使用 String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = ’test', 'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = ‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); tableEnv.executeSql(createHbaseSql); Table queryTable = tableEnv.sqlQuery("select * from dimension"); tableEnv.toAppendStream(queryTable, Row.class).print(); 你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: http://apache-flink.147419.n8.nabble.com/