Re: flink 1.11.1 版本执行HiveCatalog遇到问题质询

2020-09-08 文章 taochanglian

贴一下代码

在 2020/9/8 14:09, zhongbaoluo 写道:

据插入数据执行失败,也没有找到异常。 yarn


Re: flink sql 1.11.1 could not insert hive orc record

2020-09-08 文章 taochanglian

看看你的表是不是事务表,hive建表的时候加上 'transactional' = 'false'

在 2020/9/8 16:26, 大罗 写道:

Hi,我使用flink sql 1.11.1 的hive catalog特性往hive orc表插入数据:

我所使用的版本如下:

Hadoop 3.0.0+cdh6.3.2

HDFS 3.0.0+cdh6.3.2

HBase 2.1.0+cdh6.3.2

Hive 2.1.1+cdh6.3.2

Flink 1.11.1

定义hive orc表如下:
create table dest_orc (
  i int
) partitioned by (ts string)
stored as orc
TBLPROPERTIES(
  'orc.compress' = 'SNAPPY'
);

在flink-sql插入数据:
Flink SQL> insert into dest_orc select 1,  '2020-09-08 10:11:00' ;
[INFO] Table update statement has been successfully submitted to the
cluster:
Job ID: a2c96bcaf23abc24de8e5405ec2bb7c6

报错如下:
2020-09-08 16:16:39
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$183.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
Failed to create Hive RecordWriter
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58)
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
... 25 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
... 26 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getOptions(OrcOutputFormat.java:161)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:189)
at
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.getHiveRecordWriter(OrcOutputFormat.java:67)
at

Re: 消费kafka数据乱序问题

2020-09-06 文章 taochanglian
业务端根据用户ID 
hash发送到kafka,保证每一个用户的所有操作在kafka的同一个partition内,并且在发送端保证操作有序。


至于flink消费kafka后,乱序,不太可能,或者说可能性极小,毕竟都是按照offset来消费。

在 2020/9/4 18:59, Xiao Xu 写道:

两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理

宁吉浩  于2020年9月4日周五 下午5:56写道:


我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
我的解决办法是把迟到数据丢弃,然后进行业务计算;
另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;


--
发件人:smq <374060...@qq.com>
发送时间:2020年9月4日(星期五) 17:35
收件人:wwj ; user-zh 
主 题:回复:消费kafka数据乱序问题


换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100

---原始邮件---
发件人: "wwj"



Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 taochanglian

确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。

举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key 
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。


在 2020/9/4 13:14, Benchao Li 写道:

如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

samuel@ubtrobot.com  于2020年9月3日周四 下午3:41写道:


补充一下环境信息:

有点类似以下问题:
在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。

不确定是否是因为kafka多分区引起的?



发件人: samuel@ubtrobot.com
发送时间: 2020-09-03 09:23
收件人: user-zh
主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co
.BroadcastProcessFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
  * 使用广播实现动态的配置更新
  */
public class ExceptionAlertHour4{

private static final Logger LOG =
LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] 

Re: FlinkKafkaConsumer问题

2020-09-03 文章 taochanglian

为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:


Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 
的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 
partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。


在 2020年9月4日,上午10:25,op <520075...@qq.com> 写道:

谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?


--原始邮件--
发件人: 
   "user-zh"   
 



Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-02 文章 taochanglian
org.apache.flink.streaming.api.windowing.windows.TimeWindow 



getWindowStartWithOffset??17-182020-09-01 
18:00:00.0
2020-09-01 18:00:00.0
??2020-09-01 18:00:00.001??

??wartermarker5??2020-09-01 18:00:05.001 ??

?? 2020/9/2 15:20, samuel@ubtrobot.com :

flink SQL,tumble 
window
 //eventtime??watermark
DataStream> withTimestampsAndWatermarksDS = 
singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))   //watermark
.withTimestampAssigner((event, timestamp)->event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
 "(starttime + interval '8' hour ) as stime," +
 "(endtime + interval '8' hour ) as etime  " +
 "from (select appid,eventid,count(*) as cnt," +
 "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
 "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
 "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' 
HOUR),TIME '00:00:00')";//

Table table = tenv.sqlQuery(sql);
DataStream dataStream = tenv.toAppendStream(table, Result.class);

??
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
2020-09-01 18:00:00.0??
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 

ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, 
etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}



Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 文章 taochanglian

应该是。通过源码可以知道flink-json目前支持2种内置json格式的解析,一个是canal,一个是debezium。

具体可参考: 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema 和 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema


在 2020/8/24 17:27, dixingxin...@163.com 写道:

Hi all:
Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
1.底层是使用了debezium来加载历史数据的吗?
2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?

希望能帮忙解答一下,谢谢。


Best,
Xingxing Di




Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

2020-08-24 文章 taochanglian

hive3.0默认就是事务表,建表语句加上 TBLPROPERTIES('transactional'='false')


在 2020/8/24 15:43, 黄蓉 写道:

感谢各位:

   
我已经找到问题的原因了,是因为HDP3.0.1中的Hive3.1.0默认开启了事务,而Flink 
1.11.0写入和读取hive表应该是暂时不支持事务的。所以两者不兼容。我把Hive中事务相关的设置都关闭之后就正常了。 



Jessie
jessie...@gmail.com

-- Original Message --
From: "taochanglian" 
To: user-zh@flink.apache.org
Sent: 8/24/2020 5:28:56 AM
Subject: Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?


在 2020/8/24 3:01, 黄蓉 写道:

各位好:

我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink 
sql 
client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show 
tables,show database等语句都可以正常显示。


配置的hadoop环境变量如下:
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop"
export 
HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*"


sql-client配置文件如下:
tables: []
functions: []
catalogs:
   - name: myhive
 type: hive
 hive-conf-dir: /opt/hive-conf
execution:
  planner: blink
  type: batch
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  current-catalog: myhive
  current-database: default
  restart-strategy:
    type: fallback
deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0


请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? 



Jessie
jessie...@gmail.com




Re: flink 1.11.1 与HDP3.0.1中的hive集成,查询不出hive表数据

2020-08-23 文章 taochanglian
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/ 
中的 flink-sql-connector-hive-3.1.2 下载了么,放到lib里面了么?


在 2020/8/24 3:01, 黄蓉 写道:

各位好:

我使用的环境是HDP3.0.1的沙盒,flink是最新版本的1.11.1,从官网直接下载的编译好的jar包。我想测试flink与hive的集成,包括查询hive表的数据、写入数据到hive表等操作。目前我遇到问题就是通过flink 
sql 
client查询不出表数据,并且也不报错。但是该表在hive中查询是有记录的。其余的show 
tables,show database等语句都可以正常显示。


配置的hadoop环境变量如下:
export HADOOP_CONF_DIR="/etc/hadoop/conf"
export HADOOP_HOME="/usr/hdp/3.0.1.0-187/hadoop"
export 
HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:/usr/hdp/current/hadoop-mapreduce-client/*:/usr/hdp/current/hadoop-mapreduce-client/lib/*"


sql-client配置文件如下:
tables: []
functions: []
catalogs:
   - name: myhive
 type: hive
 hive-conf-dir: /opt/hive-conf
execution:
  planner: blink
  type: batch
  result-mode: table
  max-table-result-rows: 100
  parallelism: 3
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  current-catalog: myhive
  current-database: default
  restart-strategy:
    type: fallback
deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0


请问出现这种情况是不是官网的flink包与hdp3.0.1不兼容?我需要自己重新编译flink吗? 



Jessie
jessie...@gmail.com


Re: flink sql 数据异常导致任务失败

2020-08-18 文章 taochanglian

你kafka里面的是json么?format是json么?

String resultCreateTableSql = createKafkaSourceSQL +" WITH ( " +" 'connector' = 'kafka' ," +" 'topic' = '" + kafkaTopic +"'," +" 'properties.bootstrap.servers' = '" + 
kafkaBootstrapServers +"'," +" 'properties.group.id' = '" + kafkaGroupId +"'," +" 'format' = '" + kafkaFormat +"'," +" 'scan.startup.mode' = '" + 
scanStartUpMode +"'," +" 'json.fail-on-missing-field' = 'false'," +" 'json.ignore-parse-errors' = 'true' )";

json.fail-on-missing-field
 json.ignore-parse-errors
 这两个参数你加了么?加了没用?

在 2020/8/18 14:34, 赵一旦 写道:

我刚刚接触flinksql,主要是感觉这个问题很明显,大家如果生产中使用的话,应该都已经有方案才对,但是好像没啥人有回应。

shizk233  于2020年8月18日周二 下午2:26写道:


考虑修改一下json解析的逻辑来处理异常数据?

赵一旦  于2020年8月18日周二 上午11:59写道:


有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
api,然后捕获所有异常即可。

赵一旦  于2020年8月17日周一 下午7:15写道:


kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?

以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。

现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。



Re: Print SQL connector无法正常使用

2020-08-18 文章 taochanglian

我这个是mysql的,直接run,没问题


EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String createMysqlTableSQL ="CREATE TABLE mysqlTable1 (\n" +
" id INT,\n" +
" username STRING,\n" +
" age INT , \n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc' ,\n" +
" 'url' = 'jdbc:mysql://1.2.3.4:3306/bigdata',\n" +
" 'table-name' = 'test' ,\n" +
" 'username' = 'root' ,\n" +
" 'password' = 'root' \n" +
")";
String  print_table=" CREATE TABLE print_table (" +
" id INT," +
" username STRING," +
" age INT" +
" ) WITH (" +
" 'connector' = 'print'" +
" )";
tableEnv.executeSql(createMysqlTableSQL);
tableEnv.executeSql(print_table);

String querysql ="insert into print_table select * from mysqlTable1";

tableEnv.executeSql(querysql);

在 2020/8/18 11:57, xiao cai 写道:

Hi china_tao:
你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢


  原始邮件
发件人: china_tao
收件人: user-zh
发送时间: 2020年8月17日(周一) 23:00
主题: Re: Print SQL connector无法正常使用


String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 
'connector.table-name' = ’test', 'connector.write.buffer-flush.max-rows' = '10', 
'connector.zookeeper.quorum' = ‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); 
tableEnv.executeSql(createHbaseSql); Table queryTable = tableEnv.sqlQuery("select * from 
dimension"); tableEnv.toAppendStream(queryTable, Row.class).print(); 
你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/