Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-17 文章 Congxian Qiu
Hi
   像我之前说的那样,加 -d 和不加 -d 使用的是不同的模式启动作业的。从你的报错栈来看,应该是类冲突了。你可以看下这个文档[1] 看看能否帮助你
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/debugging_classloading.html
Best,
Congxian


bradyMk  于2020年8月17日周一 下午2:36写道:

> 您好:
>
> 我没有尝试过新版本,但是觉得好像不是版本的问题,因为我其他所有flink作业加上-d都能正常运行,就这个不行,并且如果我不用(-d)提交,这个也是可以运行的。我也很奇怪
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 在sink 执行notifyCheckpointComplete 方法时能否收到上游 snapshot collect 发送的数据?

2020-08-17 文章 Congxian Qiu
Hi
notifyCheckpointComplete 是整个 checkpoint 完成后调用的(也就是所有算子都做完了 snapshot,而且
JM 也做完了一些其他的工作),你的需求看上去只是要在算子间做一些顺序操作,这个应该不需要依赖 notifyCheckpointComplete
的,你可以自己写一个逻辑,在 submit 收集到 N 个信号后再做相应的事情。
Best,
Congxian


key lou  于2020年8月17日周一 上午11:42写道:

> 谢谢 解答。也就是假如 A->B 这样一个 graph。在一次checkpoint 中 A 调用  snapshot 往下游发的数据,在B 执行
> notifyCheckpointComplete 与 Asnapshot 下发的数据到达B   这2者没有必然的先后顺序。
>
> 另外就是 如果没有先后顺序,有没有什么办法 或者是在 B执行 某某操作前 能确保 这次 checkpoint 中 A  snapshot  发出的数据
> 到达了B.
>
>  我的场景是 有3个核心算子  start->proccess->submit . 其中 start和 submit 并行度为1, proccess
> 并行度为N, start  会开启一个事务 编号proccess  用这个事务 编号
> 去做预处理(赞一批处理一次,并把这一次处理结果下发,给下游做事务提交),  submit  收到上游批处理的结果 用 同样的事务编号去提交
>
>
> Congxian Qiu  于2020年8月17日周一 上午10:42写道:
>
> > Hi
> > 上游 snapshot 的逻辑和下游收到之前的 notifyCheckpointComplete
> > 之间是没有必然联系的,所以这个从理论上是不保证先后顺序的。
> > Best,
> > Congxian
> >
> >
> > key lou  于2020年8月16日周日 下午9:27写道:
> >
> > > 各位大佬:
> > >在如下代码中: FCombine  执行snapshot  collect 发送数据之后如果不执行sleep 则  FSubmit
> > > 在执行 notifyCheckpointComplete 方法时,list 集合 ls 为空。
> > > 如果在  FCombine  执行snapshot  collect 发送数据之后如果执行sleep,
> > > 在执行 notifyCheckpointComplete 方法时 则就可以收到  snapshot  collect 发送的数据。
> > > 我之前的理解是每个算子在执行完checkpoint 之后 才会把 barrier 广播到下游算子。 所以觉得下游无论如何应该在执行
> > > notifyCheckpointComplete 之前就会收到 上游 snapshot  collect 发送数据(因为 snapshot
> > >  collect 在前,广播 barrier  在后,然后下游在收到了 barrier  才会执行 chekpoint
> > > 的相关方法,所以在执行相关方法前 上游 snapshot  collect 发出的数据就应该已经到达了下游)。
> > > 但是根据如下代码的测试来看,不是这样的。请大佬帮忙解答下原因。
> > >
> > > public class FlinkCheckpointTest {
> > > public static void main(String[] args) throws Exception {
> > > StreamExecutionEnvironment steamEnv =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > steamEnv.enableCheckpointing(1000L*2);
> > > steamEnv
> > > .addSource(new FSource()).setParallelism(4)
> > > .transform("开始事务", Types.STRING,new
> > FStart()).setParallelism(1)
> > > .process(new FCombine()).name("事务预处理").setParallelism(4)
> > > .addSink(new FSubmit()).name("提交事务").setParallelism(1)
> > > ;
> > > steamEnv.execute("test");
> > > }
> > >
> > >static class FSource extends RichParallelSourceFunction{
> > > @Override
> > > public void run(SourceContext sourceContext) throws
> > > Exception {
> > > int I =0;
> > > while (true){
> > > I = I + 1;
> > > sourceContext.collect("thread " +
> > > Thread.currentThread().getId() +"-" +I);
> > > Thread.sleep(1000);
> > > }
> > > }
> > > @Override
> > > public void cancel() {}
> > > }
> > >
> > > static class FStart extends AbstractStreamOperator
> > > implements OneInputStreamOperator{
> > >volatile Long ckid = 0L;
> > > @Override
> > > public void processElement(StreamRecord streamRecord)
> > > throws Exception {
> > > log("收到数据: " + streamRecord.getValue() + "..ckid:" + ckid);
> > > output.collect(streamRecord);
> > > }
> > > @Override
> > > public void prepareSnapshotPreBarrier(long checkpointId)
> > > throws Exception {
> > > log("开启事务: " + checkpointId);
> > > ckid = checkpointId;
> > > super.prepareSnapshotPreBarrier(checkpointId);
> > > }
> > > }
> > >
> > > static class FCombine extends ProcessFunction
> > > implements CheckpointedFunction {
> > > List ls = new ArrayList();
> > > Collector collector =null;
> > > volatile Long ckid = 0L;
> > >
> > > @Override
> > > public void snapshotState(FunctionSnapshotContext
> > > functionSnapshotContext) throws Exception {
> > > StringBuffer sb = new StringBuffer();
> > > ls.forEach(x->{sb.append(x).append(";");});
> > > log("批处理 " + functionSnapshotContext.getCheckpointId() +
> > > ": 时收到数据:" + sb.toString());
> > > Thread.sleep(5*1000);
> > > collector.collect(sb.toString());
> > > ls.clear();
> > > Thread.sleep(5*1000);
> > > //Thread.sleep(20*1000);
> > > }
> > > @Override
> > > public void initializeState(FunctionInitializationContext
> > > functionInitializationContext) throws Exception {}
> > > @Override
> > > public void processElement(String s, Context context,
> > > Collector out) throws Exception {
> > > if(StringUtils.isNotBlank(s)){
> > > ls.add(s);
> > > }
> > > log("收到数据 :" + s + "; 这批数据大小为:" + ls.size() + "..ckid:" +
> > > ckid);
> > > if(collector ==null){
> > > collector = out;
> > > }
> > > }
> > > }
> > >
> > > static class FSubmit extends RichSinkFunction implements
> > > /*  CheckpointedFunction,*/ CheckpointListener {
> > > List ls = new ArrayList();
> > > volatile 

Re: 1.11 kafka producer 只往一个partition里写

2020-08-17 文章 x2009438
是的,我看了一下源码。

因为我用的是simplestringschema,不属于keyedschema。所以flinkfixedpartitioner的open方法被跳过了,没有初始化subtask
 index,所以全部是0%partition=0,于是都写到partition 0里去了。
我感觉怪怪的,不太合逻辑。

发自我的iPhone

>> 在 2020年8月17日,23:28,cs <58683...@qq.com> 写道:
> 目前Kafka 
> producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
> 具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
> 由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年8月17日(星期一) 晚上10:03
> 收件人:"user-zh" 
> 主题:1.11 kafka producer 只往一个partition里写
> 
> 
> 
> 我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
>  0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助
> 
> 
> 
> 
> 发自我的iPhone
> 
> 
> 发自我的iPhone



Re: Print SQL connector无法正常使用

2020-08-17 文章 xiao cai
Hi china_tao:
你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢


 原始邮件 
发件人: china_tao
收件人: user-zh
发送时间: 2020年8月17日(周一) 23:00
主题: Re: Print SQL connector无法正常使用


String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 'connector.table-name' = ’test', 
'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = 
‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); 
tableEnv.executeSql(createHbaseSql); Table queryTable = 
tableEnv.sqlQuery("select * from dimension"); 
tableEnv.toAppendStream(queryTable, Row.class).print(); 
你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: flink1.11 mysql 分页查询

2020-08-17 文章 Leonard Xu
Hi
可以跟下这个issue[1], 在1.12会支持用于自定义query

Best
Leonard
https://issues.apache.org/jira/browse/FLINK-17826 


> 在 2020年8月18日,09:50,china_tao  写道:
> 
> 那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark
> dataframe中的dbtable,万分感谢
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.11 mysql 分页查询

2020-08-17 文章 china_tao
那肯定不行啊,我mysql表里面内容很多。FlinkSQL有没有直接分页查询的方法么?望赐教。类似于spark
dataframe中的dbtable,万分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

??????1.11 kafka producer ????????partition????

2020-08-17 文章 Matrix42
FlinkFixedPartitioner??partitionsubTask??partition??partitoner??null??kafka??partitoner??roundrobin




----
??: "x2009438"

Re: flink1.11 mysql 分页查询

2020-08-17 文章 Leonard Xu
Hi

> 在 2020年8月17日,20:46,china_tao  写道:
> 
> 您好,请教一个问题,flink sql 读取mysql如何分页查询。
> 在spark中,dataframe可以通过dbtable,传入分页查询的语句。
> val resultDF = session.read.format("jdbc")
>  .option("url",jdbcUrl)
>  .option("dbtable" , selectSql )
>  .option("user",user)
>  .options(writeOpts)
>  .option("password",password).load()
> 
> 在flink中,通过connector,会读取全表么?

会的,就是读取全表,connector就是读取全量表

> String insertSql = CREATE TABLE MyUserTable (
>  id BIGINT,
>  name STRING,
>  age INT,
>  status BOOLEAN,
>  PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>   'table-name' = 'users'
> );
> tableEnv.executeSql(insertSql);
> 以上的executesql会进行全表读取么?
> 还是执行了下面的sql,才会读取内容?
> String querysql = ”select * from MyUserTable limit 1 to 10“;

这个query Flink的query,从MyUserTable这张全量的表里筛选数据

Best
Leonard



回复:1.11 kafka producer 只往一个partition里写

2020-08-17 文章 cs
目前Kafka 
producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。




--原始邮件--
发件人:
"user-zh"   
 

Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 文章 shizk233
有没有可能把维表数据也作为数据流从kafka输入呢

Jim Chen  于2020年8月17日周一 下午4:36写道:

> 大家好:
> 我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
> 现在遇到的几个比较棘手的问题:
> 1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
> 2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
> 3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空
>
> 不知道,针对上面的场景,有什么好的解决思路或者方案
>


Re: 如何在KeyedProcessFunction中获取processingTime

2020-08-17 文章 shizk233
ctx.timestamp()其实就是获取的StreamRecord的时间戳,也就是事件被提取出来的时间戳。
这个方法一般需要使用event time,并且在数据流上assign过timestamp和watermark。

ゞ野蠻遊戲χ  于2020年8月16日周日 下午7:57写道:

> 大家好
>
>   
> 当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?
>
>
> 谢谢!
> 嘉治


Re: Print SQL connector无法正常使用

2020-08-17 文章 china_tao
String createHbaseSql = CREATE TABLE dimension (
rowKey STRING,
cf ROW,
tas BIGINT
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = ’test',
'connector.write.buffer-flush.max-rows' = '10',
'connector.zookeeper.quorum' = ‘IP:port',
'connector.zookeeper.znode.parent' = '/hbase',
);
tableEnv.executeSql(createHbaseSql);
Table queryTable = tableEnv.sqlQuery("select * from dimension");
tableEnv.toAppendStream(queryTable, Row.class).print();

你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink cdc能支持分库分表读取吗

2020-08-17 文章 china_tao
在数据库层面建view(view关联你的分库分表),然后flink操作这个view。就类似你用sqoop或者其它抽取工具的操作方式一样。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

1.11 kafka producer 只往一个partition里写

2020-08-17 文章 x2009438
我看1.11的kafka默认是用FlinkFixedPartitioner,应该是把sink每个subtask里的数据全部写到某一个partition吧?但是碰到了一个奇怪的事情:job整体并行度为5的情况下,数据结果只往partition
 0里写。然后把partitoner用null替代之后是以roundrobin方式写入每一个partition。感到很奇怪啊。求助




发自我的iPhone


发自我的iPhone

flink1.11 mysql 分页查询

2020-08-17 文章 china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。在spark中,dataframe可以通过dbtable,传入分页查询的语句。val
resultDF = session.read.format("jdbc")  .option("url",jdbcUrl) 
.option("dbtable" , selectSql )  .option("user",user) 
.options(writeOpts) 
.option("password",password).load()在flink中,通过connector,会读取全表么?String
insertSql = CREATE TABLE MyUserTable (  id BIGINT,  name STRING,  age INT, 
status BOOLEAN,  PRIMARY KEY (id) NOT ENFORCED) WITH (   'connector' =
'jdbc',   'url' = 'jdbc:mysql://localhost:3306/mydatabase',   'table-name' =
'users');tableEnv.executeSql(insertSql);以上的executesql会进行全表读取么?还是执行了下面的sql,才会读取内容?String
querysql = ”select * from MyUserTable limit 1 to 10“;
tableEnv.sqlQuery(querySql);执行上看的sqlQuery才会真正的读取数据吧。问题比较简单,只是有点懵,不知道跟spark是否有区别。谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 mysql 分页查询

2020-08-17 文章 china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 mysql 分页查询

2020-08-17 文章 china_tao
您好,请教一个问题,flink sql 读取mysql如何分页查询。
在spark中,dataframe可以通过dbtable,传入分页查询的语句。
val resultDF = session.read.format("jdbc")
  .option("url",jdbcUrl)
  .option("dbtable" , selectSql )
  .option("user",user)
  .options(writeOpts)
  .option("password",password).load()

在flink中,通过connector,会读取全表么?
String insertSql = CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
tableEnv.executeSql(insertSql);
以上的executesql会进行全表读取么?
还是执行了下面的sql,才会读取内容?
String querysql = ”select * from MyUserTable limit 1 to 10“;
 tableEnv.sqlQuery(querySql);
执行上看的sqlQuery才会真正的读取数据吧。

问题比较简单,只是有点懵,不知道跟spark是否有区别。
谢谢
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink-1.10.1 想用 DDL 入 ES5.6

2020-08-17 文章 kcz
tks.收到





-- 原始邮件 --
发件人: Yangze Guo https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3fgt;
 开始
 比如你自己实现了Elasticsearch5DynamicSink 


Re: PyFlink 中间表

2020-08-17 文章 Xingbo Huang
Hi,

支持

Best,
Xingbo

guaishushu1...@163.com  于2020年8月17日周一 下午7:55写道:

> 哪位大佬知道Flink 1.10 PyFlink支持中间表 或者支持这种写法吗
>   source = st_env.scan("source_kafka_ifang_dkt_log")
>   dim_table = source.select("`cluster`, `caller`, `cid`,`content`, `ip`
> `path`, `type`")
>   st_env.register_table('dim_table', dim_table)
>
>
>
>
> guaishushu1...@163.com
>


PyFlink 中间表

2020-08-17 文章 guaishushu1...@163.com
哪位大佬知道Flink 1.10 PyFlink支持中间表 或者支持这种写法吗
  source = st_env.scan("source_kafka_ifang_dkt_log")
  dim_table = source.select("`cluster`, `caller`, `cid`,`content`, `ip` `path`, 
`type`")
  st_env.register_table('dim_table', dim_table)




guaishushu1...@163.com


flink sql 数据异常导致任务失败

2020-08-17 文章 赵一旦
kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?

以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。

现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。


flink 1.10.1 ???????? OutOfMemoryError: Metaspace

2020-08-17 文章 ????
hi all
 flink 1.10.1 ??10??
2020-08-15 19:32:59
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:498)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
at sun.reflect.GeneratedMethodAccessor250.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:282)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory 
error has occurred. This can mean two things: either the job requires a larger 
size of JVM metaspace to load classes or there is a class loading leak. In the 
first case 'taskmanager.memory.jvm-metaspace.size' configuration option should 
be increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...

MemoryAnalyzer??dump 700m 70??m


??

Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 Rui Li
没错,-C是需要每台机器上能访问到的

On Mon, Aug 17, 2020 at 5:56 PM 赵一旦  wrote:

> 不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。
>
> Rui Li  于2020年8月17日周一 下午5:40写道:
>
> > 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
> >
> > On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:
> >
> > > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> > >
> > > Rui Li  于2020年8月17日周一 下午5:36写道:
> > >
> > > > 用shade plugin的时候可以指定service resource
> > > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > > >
> > > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > > >
> > > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> > > >
> > > > >
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > > >
> > > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > > >
> > > > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > > > >
> > > > > > 可能是打fat
> > > > > >
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > > >
> > > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > > > >
> > > > > > > 代码如下:
> > > > > > > // tEnv;
> > > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > > > "  cid STRING,  " +
> > > > > > > "  server_time BIGINT,  " +
> > > > > > > "  d MAP,  " +
> > > > > > > "  process_time AS PROCTIME(),  " +
> > > > > > > "  event_time AS
> TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > > 1000)),
> > > > > > > " +
> > > > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL
> '60'
> > > > > SECOND
> > > > > > > " +
> > > > > > > ") WITH (  " +
> > > > > > > "  'update-mode' = 'append',  " +
> > > > > > > "  'connector.type' = 'kafka',  " +
> > > > > > > "  'connector.version' = 'universal',  " +
> > > > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > > > "  'connector.properties.zookeeper.connect' =
> > > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > > > "  'connector.properties.bootstrap.servers' =
> > > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > > > "  'format.type' = 'json'  " +
> > > > > > > ")");
> > > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > > >
> > > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > > >
> > > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > > >
> > > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > > >
> > > > > > > Caused by:
> > > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > > Could not find a suitable table factory for
> > > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> > in
> > > > > > > the classpath.
> > > > > > >
> > > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > > >
> > > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar
> 这个命令的机器
> > > > > > >
> > > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > > >
> > > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best regards!
> > > > > > Rui Li
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: flink-1.10.1 想用 DDL 入 ES5.6

2020-08-17 文章 Yangze Guo
社区已经在考虑废弃es5的支持了

Best,
Yangze Guo

On Mon, Aug 17, 2020 at 5:38 PM kcz <573693...@qq.com> wrote:
>
> 有一个小问题比较好奇,当时社区为什么没有提供ES5的sql功能,是处于什么考虑吗?因为已经提供了ES5 sinK的connect。
> --原始邮件--
> 发件人:  
>   "kcz"   
>  
> <573693...@qq.com;
> 发送时间:2020年8月17日(星期一) 上午8:34
> 收件人:"user-zh"
> 主题:回复:flink-1.10.1 想用 DDL 入 ES5.6
>
>
>
> 谢谢大佬 我先研究研究
>
>
>
>
>
> -- 原始邮件 --
> 发件人: Leonard Xu  发送时间: 2020年8月14日 10:25
> 收件人: user-zh  主题: 回复:flink-1.10.1 想用 DDL 入 ES5.6
>
>
>
> Hi,
> 我贴的链接里有对应的PR[1], 你可以看看这个PR里的代码,代码入口可以从 Elasticsearch6DynamicSink.java 
>   开始
> 比如你自己实现了Elasticsearch5DynamicSink 
>   一套后,再打一个 es5 的sql jar 就好了。
>
> 祝好
> Leonard
> [1] https://github.com/apache/flink/pull/12184 
> 
>
>  在 2020年8月14日,10:14,kcz <573693...@qq.com 写道:
> 
>  查看您说的[1]的url之后,发现里面并没有跟 es sql jar有关的。
> 
> 
> 
>  --
>  Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 赵一旦
不一样吧。-C 指定的是必须所有集群机器能访问的。sql-client指定的那个按照之前其他同学的说法是会被上传的。

Rui Li  于2020年8月17日周一 下午5:40写道:

> 对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定
>
> On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:
>
> > 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
> >
> > Rui Li  于2020年8月17日周一 下午5:36写道:
> >
> > > 用shade plugin的时候可以指定service resource
> > > transformer,应该能把多个service文件merge起来。具体可以参考:
> > >
> > >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> > >
> > > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> > >
> > > >
> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > > >
> > > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > > >
> > > >
> > > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > > >
> > > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > > >
> > > > > 可能是打fat
> > > > >
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > > >
> > > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > > >
> > > > > > 代码如下:
> > > > > > // tEnv;
> > > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > > "  cid STRING,  " +
> > > > > > "  server_time BIGINT,  " +
> > > > > > "  d MAP,  " +
> > > > > > "  process_time AS PROCTIME(),  " +
> > > > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > > 1000)),
> > > > > > " +
> > > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > > SECOND
> > > > > > " +
> > > > > > ") WITH (  " +
> > > > > > "  'update-mode' = 'append',  " +
> > > > > > "  'connector.type' = 'kafka',  " +
> > > > > > "  'connector.version' = 'universal',  " +
> > > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > > "  'connector.properties.zookeeper.connect' =
> > > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > > "  'connector.properties.bootstrap.servers' =
> > > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > > "  'format.type' = 'json'  " +
> > > > > > ")");
> > > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > > >
> > > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > > >
> > > > > > test.jar是个fat jar,相关依赖都有了。
> > > > > >
> > > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > > >
> > > > > > Caused by:
> > > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > > Could not find a suitable table factory for
> > > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory'
> in
> > > > > > the classpath.
> > > > > >
> > > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > > >
> > > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > > >
> > > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > > >
> > > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best regards!
> > > > > Rui Li
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


sql client 并行度问题

2020-08-17 文章 18579099...@163.com
sql client 读取hive表,hive表中一共有21个文件需要读取,sql 
client提示需要21个并行度,但是我slot并没有这么多。有什么办法可以把并行度改小?



18579099...@163.com


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 Rui Li
对,这个文件每一行就是一个类名。另外你说的flink run指定额外的jar包应该可以通过-C参数来指定

On Mon, Aug 17, 2020 at 5:38 PM 赵一旦  wrote:

> 哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?
>
> Rui Li  于2020年8月17日周一 下午5:36写道:
>
> > 用shade plugin的时候可以指定service resource
> > transformer,应该能把多个service文件merge起来。具体可以参考:
> >
> >
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
> >
> > On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
> >
> > > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> > >
> > > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> > >
> > >
> > >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> > >
> > > Rui Li  于2020年8月17日周一 下午3:46写道:
> > >
> > > > 可能是打fat
> > > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > > >
> > > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > > >
> > > > > 代码如下:
> > > > > // tEnv;
> > > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > > "  cid STRING,  " +
> > > > > "  server_time BIGINT,  " +
> > > > > "  d MAP,  " +
> > > > > "  process_time AS PROCTIME(),  " +
> > > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > > 1000)),
> > > > > " +
> > > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > > SECOND
> > > > > " +
> > > > > ") WITH (  " +
> > > > > "  'update-mode' = 'append',  " +
> > > > > "  'connector.type' = 'kafka',  " +
> > > > > "  'connector.version' = 'universal',  " +
> > > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > > "  'connector.properties.zookeeper.connect' =
> > > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > > "  'connector.properties.bootstrap.servers' =
> > > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > > "  'format.type' = 'json'  " +
> > > > > ")");
> > > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > > >
> > > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > > >
> > > > > test.jar是个fat jar,相关依赖都有了。
> > > > >
> > > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > > >
> > > > > Caused by:
> > org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > > Could not find a suitable table factory for
> > > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > > the classpath.
> > > > >
> > > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > > >
> > > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > > >
> > > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > > >
> > > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > > >
> > > >
> > > >
> > > > --
> > > > Best regards!
> > > > Rui Li
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


??????flink-1.10.1 ???? DDL ?? ES5.6

2020-08-17 文章 kcz
ES5??sql??ES5
 sinK??connect??
----
??: 
   "kcz"
<573693...@qq.com;
:2020??8??17??(??) 8:34
??:"user-zh"https://github.com/apache/flink/pull/12184/files#diff-c71784e07a5a5c214a2f4f9843e6ef3f;
 
Elasticsearch5DynamicSink 


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 赵一旦
哦哦。也就是说本身这个文件内内容是支持一行一个这样的格式的是吧?

Rui Li  于2020年8月17日周一 下午5:36写道:

> 用shade plugin的时候可以指定service resource
> transformer,应该能把多个service文件merge起来。具体可以参考:
>
> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>
> On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:
>
> > @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
> >
> > 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
> >
> >
> >
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
> >
> > Rui Li  于2020年8月17日周一 下午3:46写道:
> >
> > > 可能是打fat
> > > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> > >
> > > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> > >
> > > > 代码如下:
> > > > // tEnv;
> > > > tEnv.sqlUpdate("create table dr1(  " +
> > > > "  cid STRING,  " +
> > > > "  server_time BIGINT,  " +
> > > > "  d MAP,  " +
> > > > "  process_time AS PROCTIME(),  " +
> > > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> > 1000)),
> > > > " +
> > > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> > SECOND
> > > > " +
> > > > ") WITH (  " +
> > > > "  'update-mode' = 'append',  " +
> > > > "  'connector.type' = 'kafka',  " +
> > > > "  'connector.version' = 'universal',  " +
> > > > "  'connector.topic' = 'antibot_dr1',  " +
> > > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > > "  'connector.properties.zookeeper.connect' =
> > > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > > "  'connector.properties.bootstrap.servers' =
> > > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > > "  'format.type' = 'json'  " +
> > > > ")");
> > > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > > >
> > > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > > >
> > > > test.jar是个fat jar,相关依赖都有了。
> > > >
> > > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > > >
> > > > Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > > Could not find a suitable table factory for
> > > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > > the classpath.
> > > >
> > > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > > >
> > > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > > >
> > > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > > >
> > > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


答复: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 文章 zhao liang
第三条我有个大概的想法,kafka数据把原有时间戳减去一个你觉得足够的延迟时间,生成一个新的时间戳,flink用这个时间做watermark,原有时间保留用来和habse进行匹配。

发件人: Jim Chen 
日期: 星期一, 2020年8月17日 16:36
收件人: user-zh 
主题: flink sql在实时数仓中,关联hbase维表频繁变化的问题
大家好:
我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
现在遇到的几个比较棘手的问题:
1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空

不知道,针对上面的场景,有什么好的解决思路或者方案


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 赵一旦
或者通过 flink run 方式运行的任务,能否像 sql-client.sh 那样通过-l,-j指定的jar也会被上传到集群呢?

赵一旦  于2020年8月17日周一 下午5:34写道:

> 小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
> api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。
>
> 赵一旦  于2020年8月17日周一 下午5:00写道:
>
>> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>>
>> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>>
>>
>> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>>
>> Rui Li  于2020年8月17日周一 下午3:46写道:
>>
>>> 可能是打fat
>>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>>
>>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>>>
>>> > 代码如下:
>>> > // tEnv;
>>> > tEnv.sqlUpdate("create table dr1(  " +
>>> > "  cid STRING,  " +
>>> > "  server_time BIGINT,  " +
>>> > "  d MAP,  " +
>>> > "  process_time AS PROCTIME(),  " +
>>> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>>> 1000)),
>>> > " +
>>> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
>>> SECOND
>>> > " +
>>> > ") WITH (  " +
>>> > "  'update-mode' = 'append',  " +
>>> > "  'connector.type' = 'kafka',  " +
>>> > "  'connector.version' = 'universal',  " +
>>> > "  'connector.topic' = 'antibot_dr1',  " +
>>> > "  'connector.startup-mode' = 'latest-offset',  " +
>>> > "  'connector.properties.zookeeper.connect' =
>>> > 'yq01-sw-xxx03.yq01:8681',  " +
>>> > "  'connector.properties.bootstrap.servers' =
>>> > 'yq01-sw-xxx03.yq01:8192',  " +
>>> > "  'format.type' = 'json'  " +
>>> > ")");
>>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>>> >
>>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>>> >
>>> > test.jar是个fat jar,相关依赖都有了。
>>> >
>>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>>> >
>>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> > Could not find a suitable table factory for
>>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>>> > the classpath.
>>> >
>>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>>> >
>>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>>> >
>>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>>> >
>>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>>> >
>>>
>>>
>>> --
>>> Best regards!
>>> Rui Li
>>>
>>


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 Rui Li
用shade plugin的时候可以指定service resource
transformer,应该能把多个service文件merge起来。具体可以参考:
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer

On Mon, Aug 17, 2020 at 5:00 PM 赵一旦  wrote:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li  于2020年8月17日周一 下午3:46写道:
>
> > 可能是打fat
> > jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
> >
> > On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
> >
> > > 代码如下:
> > > // tEnv;
> > > tEnv.sqlUpdate("create table dr1(  " +
> > > "  cid STRING,  " +
> > > "  server_time BIGINT,  " +
> > > "  d MAP,  " +
> > > "  process_time AS PROCTIME(),  " +
> > > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
> 1000)),
> > > " +
> > > "  WATERMARK FOR event_time AS event_time - INTERVAL '60'
> SECOND
> > > " +
> > > ") WITH (  " +
> > > "  'update-mode' = 'append',  " +
> > > "  'connector.type' = 'kafka',  " +
> > > "  'connector.version' = 'universal',  " +
> > > "  'connector.topic' = 'antibot_dr1',  " +
> > > "  'connector.startup-mode' = 'latest-offset',  " +
> > > "  'connector.properties.zookeeper.connect' =
> > > 'yq01-sw-xxx03.yq01:8681',  " +
> > > "  'connector.properties.bootstrap.servers' =
> > > 'yq01-sw-xxx03.yq01:8192',  " +
> > > "  'format.type' = 'json'  " +
> > > ")");
> > > Table t1 = tEnv.sqlQuery("select * from dr1");
> > >
> > > 我打包会把flink-json打包进去,最终结果包是test.jar。
> > >
> > > test.jar是个fat jar,相关依赖都有了。
> > >
> > > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> > >
> > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > > Could not find a suitable table factory for
> > > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > > the classpath.
> > >
> > > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> > >
> > > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> > >
> > > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> > >
> > > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 赵一旦
小伙伴们,帮忙看下怎么解决呢? 通过写代码方式,基于table
api执行flink的sql。这种情况下用到的flink-json等包通过shade等方式做成一个大jar包之后依赖的问题。

赵一旦  于2020年8月17日周一 下午5:00写道:

> @RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。
>
> 所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?
>
>
> 此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。
>
> Rui Li  于2020年8月17日周一 下午3:46写道:
>
>> 可能是打fat
>> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>>
>> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>>
>> > 代码如下:
>> > // tEnv;
>> > tEnv.sqlUpdate("create table dr1(  " +
>> > "  cid STRING,  " +
>> > "  server_time BIGINT,  " +
>> > "  d MAP,  " +
>> > "  process_time AS PROCTIME(),  " +
>> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time /
>> 1000)),
>> > " +
>> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
>> > " +
>> > ") WITH (  " +
>> > "  'update-mode' = 'append',  " +
>> > "  'connector.type' = 'kafka',  " +
>> > "  'connector.version' = 'universal',  " +
>> > "  'connector.topic' = 'antibot_dr1',  " +
>> > "  'connector.startup-mode' = 'latest-offset',  " +
>> > "  'connector.properties.zookeeper.connect' =
>> > 'yq01-sw-xxx03.yq01:8681',  " +
>> > "  'connector.properties.bootstrap.servers' =
>> > 'yq01-sw-xxx03.yq01:8192',  " +
>> > "  'format.type' = 'json'  " +
>> > ")");
>> > Table t1 = tEnv.sqlQuery("select * from dr1");
>> >
>> > 我打包会把flink-json打包进去,最终结果包是test.jar。
>> >
>> > test.jar是个fat jar,相关依赖都有了。
>> >
>> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>> >
>> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> > Could not find a suitable table factory for
>> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>> > the classpath.
>> >
>> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
>> >
>> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>> >
>> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>> >
>> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>> >
>>
>>
>> --
>> Best regards!
>> Rui Li
>>
>


Re: PyFlink 写入ES

2020-08-17 文章 Xingbo Huang
Hi,

其实报错信息已经说清楚了你用的方式的问题了,这个host方法是需要传入三个参数的,第一个是你的hostname,第二个是你的port,第三个是你使用的protocol。
可以采用下面这种方式试一下
.host("es9223.db.58dns.org", 9223, "http")

Best,
Xingbo

guaishushu1...@163.com  于2020年8月17日周一 下午5:12写道:

> PyFlink 从kafka写入ES 抛这个异常,但是host是正确的有哪位知道吗
> File "main-0-8.py", line 74, in 
> .host("http://es9223.db.58dns.org:9223;)
> TypeError: host() missing 2 required positional arguments: 'port' and
> 'protocol'
>
>
>
>
> guaishushu1...@163.com
>


PyFlink 写入ES

2020-08-17 文章 guaishushu1...@163.com
PyFlink 从kafka写入ES 抛这个异常,但是host是正确的有哪位知道吗
File "main-0-8.py", line 74, in 
.host("http://es9223.db.58dns.org:9223;)
TypeError: host() missing 2 required positional arguments: 'port' and 'protocol'




guaishushu1...@163.com


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 赵一旦
@RuiLi,你这个说法对的。我刚刚看了下,是这样的。因为我是shade掉的,倒是没丢失,但是多种connector和format相互覆盖了。

所以说,这就是问题了。flink-json,flink-csv这种包没办法shade进去看样子?

此外,如果我实现一个jar内同时提供flink-json和flink-csv的功能是不是当前这种机制也无法实现(为什么要依靠services目录下一个固定名字的文件来记录Factory实现类,这貌似就是导致这个问题的点)。

Rui Li  于2020年8月17日周一 下午3:46写道:

> 可能是打fat
> jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现
>
> On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:
>
> > 代码如下:
> > // tEnv;
> > tEnv.sqlUpdate("create table dr1(  " +
> > "  cid STRING,  " +
> > "  server_time BIGINT,  " +
> > "  d MAP,  " +
> > "  process_time AS PROCTIME(),  " +
> > "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> > " +
> > "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> > " +
> > ") WITH (  " +
> > "  'update-mode' = 'append',  " +
> > "  'connector.type' = 'kafka',  " +
> > "  'connector.version' = 'universal',  " +
> > "  'connector.topic' = 'antibot_dr1',  " +
> > "  'connector.startup-mode' = 'latest-offset',  " +
> > "  'connector.properties.zookeeper.connect' =
> > 'yq01-sw-xxx03.yq01:8681',  " +
> > "  'connector.properties.bootstrap.servers' =
> > 'yq01-sw-xxx03.yq01:8192',  " +
> > "  'format.type' = 'json'  " +
> > ")");
> > Table t1 = tEnv.sqlQuery("select * from dr1");
> >
> > 我打包会把flink-json打包进去,最终结果包是test.jar。
> >
> > test.jar是个fat jar,相关依赖都有了。
> >
> > 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
> >
> > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> > Could not find a suitable table factory for
> > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> > the classpath.
> >
> > 可是我flink-json.jar都打包进去了,居然还是报错。。。
> >
> > 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
> >
> > 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
> >
> > 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-17 文章 Jim Chen
hbase维表的数据量,大概500G

Dream-底限  于2020年8月13日周四 下午12:16写道:

> flink暴漏的lookup
>
> 是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大
>
> Jim Chen  于2020年8月13日周四 上午11:53写道:
>
> > 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: TableColumn为啥不包含comment

2020-08-17 文章 Shengkai Fang
hi, 那请你在那个jira留一下言,我会把这个分配给你。

Harold.Miao  于2020年8月17日周一 上午11:26写道:

> 谢谢   我想提交这个patch
>
> Shengkai Fang  于2020年8月14日周五 下午4:33写道:
>
> > hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18958
> >
> > Harold.Miao  于2020年8月13日周四 上午11:08写道:
> >
> > > hi all
> > > 我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>


flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 文章 Jim Chen
大家好:
我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
现在遇到的几个比较棘手的问题:
1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
3、hbase维表,可能5s后才会更新,但是此时kafka数据流已经过去了,关联的数据都是空

不知道,针对上面的场景,有什么好的解决思路或者方案


flink cdc能支持分库分表读取吗

2020-08-17 文章 18579099...@163.com
如题,我有一个需求是将分库分表放在一张表里进行统计(我们的16个分库16个分表共计256张表合成一张表),flink现在支持吗



18579099...@163.com


Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-17 文章 DanielGu
hi,
flink 1.11 SQL idea调试 有其他伙伴了解吗?求赐教.
最近调试卡在这里..有点出不来了
十分感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL tableEnv 依赖问题

2020-08-17 文章 Rui Li
可能是打fat
jar的时候service文件被覆盖了?可以检查一下你jar包里面META-INF/services下面的文件,看是不是没有json的实现

On Fri, Aug 14, 2020 at 7:13 PM 赵一旦  wrote:

> 代码如下:
> // tEnv;
> tEnv.sqlUpdate("create table dr1(  " +
> "  cid STRING,  " +
> "  server_time BIGINT,  " +
> "  d MAP,  " +
> "  process_time AS PROCTIME(),  " +
> "  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),
> " +
> "  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
> " +
> ") WITH (  " +
> "  'update-mode' = 'append',  " +
> "  'connector.type' = 'kafka',  " +
> "  'connector.version' = 'universal',  " +
> "  'connector.topic' = 'antibot_dr1',  " +
> "  'connector.startup-mode' = 'latest-offset',  " +
> "  'connector.properties.zookeeper.connect' =
> 'yq01-sw-xxx03.yq01:8681',  " +
> "  'connector.properties.bootstrap.servers' =
> 'yq01-sw-xxx03.yq01:8192',  " +
> "  'format.type' = 'json'  " +
> ")");
> Table t1 = tEnv.sqlQuery("select * from dr1");
>
> 我打包会把flink-json打包进去,最终结果包是test.jar。
>
> test.jar是个fat jar,相关依赖都有了。
>
> 然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
> the classpath.
>
> 可是我flink-json.jar都打包进去了,居然还是报错。。。
>
> 解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器
>
> 上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。
>
> 搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?
>


-- 
Best regards!
Rui Li


?????? ??????????

2020-08-17 文章 zhiyezou
Hi


??1%??














----
??: 
   "user-zh"



Re: flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-17 文章 bradyMk
您好:
我没有尝试过新版本,但是觉得好像不是版本的问题,因为我其他所有flink作业加上-d都能正常运行,就这个不行,并且如果我不用(-d)提交,这个也是可以运行的。我也很奇怪



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: (无主题)

2020-08-17 文章 art
hi,感谢提供的方案

是的,day是订单的创建时间

想请教下,你们的离线任务是每次都将全量订单数据一起修正吗,就是不管历史的有没有变化 都去作修正,

要是这样那会不会出现绝大部分情况,离线跑的任务都是无效的,因为历史数据未发生变化

> 在 2020年8月17日,下午1:22,zhiyezou <1530130...@qq.com> 写道:
> 
> HI
> 
> 
> 这个day应该是订单的创建时间吧
> 
> 
> 我觉得我们遇到的问题有些类似,看下我们的方案对你是否有所帮助。
> 
> 
> 首先,我们会把day这个条件控制在3天(select * where day 
> now-3),状态的TTL也是3天,即flink保留3天的状态,这样即使有3天前的数据到来也不会更新我们的结果表;这样可以解决更新错误的问题。
> 
> 
> 然后,通过离线任务来定时修正3天前的结果数据。这样可以保证数据的最终一致性
> 
> 
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2020年8月17日(星期一) 中午12:32
> 收件人:"user-zh@flink.apache.org" 
> 主题:(无主题)
> 
> 
> 
> hi,社区的小伙伴,大家好!我有一个应用场景,想请教下大家有没有遇过,有什么好的方案。
> 场景就是:按照user和day的维度统计订单表里的有效订单数,同时存在历史的订单状态随时可能被更新,比如可能当前把2个月前的订单状态置未true,所以没法根据历史结果预统计,翻译称sql就是select
>  user,day,count(*) from table where state = true group by 
> user,day;目前我已经用flink-sql-cdc-connector实现了,但是有一个问题就是state,因为按user day组合 
> 那么如果全部状态都保存后期回越来越大,但是如果设置ttl,那么如果历史订单变化,最终更新出去的值也不对。 
> 希望社区的小伙伴给我出出主意