退订

2021-02-28 Thread china_tao
退订

kafka增加字段,hive表如何处理

2020-09-23 Thread china_tao
flink1.11.1,flink sql,已经实现flink sql 读取kafka,存储到hive。现在的问题是,kafka源增加字段了,flink
sql中的hive如何修改。直接在hive中增加字段的话,每次启动,会报 hive表已经存在,如果drop table if
exists的话,历史数据就会丢。请问大家是如何处理的,谢谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 维表延迟join

2020-08-26 Thread china_tao
一般来说,是先有维表数据,再有流数据。如果出现了你这样的情况,两个方式,一个使用left
join,使流表数据的维表信息为null,后期通过etl再补录;或者碰到异常,把消息打到另外一个kafka中,再进行异常处理或者补录处理,也可以理解为您说的那种5分钟,10分钟join一次。
个人推荐先用null存储,后期etl补录。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 【闫云鹏】Flink cdc 连接mysql5.7.25报错

2020-08-26 Thread china_tao
flink什么版本?用什么方式连接的?如果是flinksql的话,使用https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html,设置driver。
如果你mysql账号密码确定没有问题的话,可以在pom中把mysql的依赖去掉,把mysql连接的jar包房到flin的lib中,再提交一次试试。






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 cdc使用

2020-08-23 Thread china_tao
支持。
insert into mysqlresult select k.vin,k.msgtime,d.brand_name from (SELECT
vin,max(msgtime) as msgtime,max(pts) as pts from kafkaSourceTable  group by
TUMBLE(rowtime, INTERVAL '10' SECOND),vin) AS k left join msyqlDimTable  FOR
SYSTEM_TIME AS OF k.pts AS d ON k.vin = d.vin

类似这样,先开10秒窗口获得kafka数据,然后join msyql维度表,然后插入mysql。
关键就是注意维度表lookup_cache_max-rows,lookup_cache_ttl这两个参数,设置维度表的更新时间。具体项目,具体对待,关键就是看看需要维度表支持多长时间的更新延迟。
另外,join维度表,目前应该只支持pts,不支持rowtime。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL sink 到 kafka partition 规则是怎样的?

2020-08-18 Thread china_tao
按我的理解,你描述错了吧,是kafka_ods_table里面的数据,是按照客户端自定义分区发送
过来的,接下来你希望处理完以后,还要保证kafka_dwd_table里面的数据,也需要保证数据按partition有序吧。因为你是把数据insert到kafka_dwd_table吧,你的描述有问题。

如果我理解的没有问题,那你看一下https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#sink-partitioner
,最关键的就是sink.partitioner这个参数的设置。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL sink 到 kafka partition 规则是怎样的?

2020-08-18 Thread china_tao
按我的理解,你描述错了吧,是kafka_ods_table里面的数据,是按照客户端自定义分区发送 
过来的,接下来你希望处理完以后,还要保证kafka_dwd_table里面的数据,也需要保证数据按partition有序吧。因为你是把数据insert到kafka_dwd_table吧,你的描述有问题。

如果我理解的没有问题,那你看一下https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#sink-partitioner
 

,最关键的就是sink.partitioner这个参数的设置。




> 在 2020年8月18日,下午5:17,wangl...@geekplus.com 写道:
> 
> 
> 直接用 FlinkSQL 实现抽取字段、字段转换的功能。
> 
> INSERT INTO kafka_dwd_table SELECT a, b, fun(c) FROM kafka_ods_table
> 
> kafka_dwd_table  topic 里面的 record 是客户端依照特定的 partition 规则发送过去的
> 经过 上面的 FlinkSQL 操作会怎样做 partition 呢? 
> 
> 谢谢,
> 王磊
> 
> 
> 
> wangl...@geekplus.com 
> 



Re: flink 1.11 order by rowtime报错

2020-08-18 Thread china_tao
错误呢?没看到。把代码贴出来看一下,是不是processtime没有设置或者设置不对



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-18 Thread china_tao
个人觉得还是取舍的问题,我们现在用flink sql 做实时数仓,维度表暂时用mysql,与业务商定好更新事件后,配置flink sql
jdbc的lookup.cache.ttl参数来设置刷新时间,不知道你项目中,是维表数据变更后,需要秒级关联到消息中?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 order by rowtime报错

2020-08-18 Thread china_tao
没有看到错误,把代码贴出来把,是不是eventtime没有设置或者设置不正确



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark
dataframe中的dbtable,万分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Print SQL connector无法正常使用

2020-08-17 Thread china_tao
String createHbaseSql = CREATE TABLE dimension (
rowKey STRING,
cf ROW,
tas BIGINT
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = ’test',
'connector.write.buffer-flush.max-rows' = '10',
'connector.zookeeper.quorum' = ‘IP:port',
'connector.zookeeper.znode.parent' = '/hbase',
);
tableEnv.executeSql(createHbaseSql);
Table queryTable = tableEnv.sqlQuery("select * from dimension");
tableEnv.toAppendStream(queryTable, Row.class).print();

你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink cdc能支持分库分表读取吗

2020-08-17 Thread china_tao
在数据库层面建view(view关联你的分库分表),然后flink操作这个view。就类似你用sqoop或者其它抽取工具的操作方式一样。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。在spark中,dataframe可以通过dbtable,传入分页查询的语句。val
resultDF = session.read.format("jdbc")  .option("url",jdbcUrl) 
.option("dbtable" , selectSql )  .option("user",user) 
.options(writeOpts) 
.option("password",password).load()在flink中,通过connector,会读取全表么?String
insertSql = CREATE TABLE MyUserTable (  id BIGINT,  name STRING,  age INT, 
status BOOLEAN,  PRIMARY KEY (id) NOT ENFORCED) WITH (   'connector' =
'jdbc',   'url' = 'jdbc:mysql://localhost:3306/mydatabase',   'table-name' =
'users');tableEnv.executeSql(insertSql);以上的executesql会进行全表读取么?还是执行了下面的sql,才会读取内容?String
querysql = ”select * from MyUserTable limit 1 to 10“;
tableEnv.sqlQuery(querySql);执行上看的sqlQuery才会真正的读取数据吧。问题比较简单,只是有点懵,不知道跟spark是否有区别。谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 mysql 分页查询

2020-08-17 Thread china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/