Re:Re: flink sql作业state size一直增加

2020-08-14 Thread sunfulin
hi, benchao,
感谢回复,那我是不是可以理解为:去掉minibatch,就可以状态过期清理了哈?

















在 2020-08-14 14:09:33,"Benchao Li"  写道:
>Hi,
>现在group agg + mini batch 还没有支持状态过期清理,已经有工作[1] 在解决这个问题了。
>
>[1] https://issues.apache.org/jira/browse/FLINK-17096
>
>sunfulin  于2020年8月14日周五 下午2:06写道:
>
>> hi,我的一个flink sql作业,在启用了idlestateretentiontime设置后,观察到web ui上的state
>> size还是一直在增大,超过maximum retention time之后state大小也没有减小的情况,请问这个可能是啥原因哈?
>>
>>
>> 使用的flink 版本:flink 1.10.1,启用的state
>> ttl配置:tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(5),
>> Time.minutes(10));
>> 我的作业逻辑是:统计每个userId每天第一次出现的记录,类似:select userId, first_value(xxx) from
>> source group by userId, date_format(eventtime, '-MM-dd');
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Re: flink sql作业state size一直增加

2020-08-14 Thread Benchao Li
是的。

sunfulin  于2020年8月14日周五 下午3:01写道:

> hi, benchao,
> 感谢回复,那我是不是可以理解为:去掉minibatch,就可以状态过期清理了哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-14 14:09:33,"Benchao Li"  写道:
> >Hi,
> >现在group agg + mini batch 还没有支持状态过期清理,已经有工作[1] 在解决这个问题了。
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-17096
> >
> >sunfulin  于2020年8月14日周五 下午2:06写道:
> >
> >> hi,我的一个flink sql作业,在启用了idlestateretentiontime设置后,观察到web ui上的state
> >> size还是一直在增大,超过maximum retention time之后state大小也没有减小的情况,请问这个可能是啥原因哈?
> >>
> >>
> >> 使用的flink 版本:flink 1.10.1,启用的state
> >> ttl配置:tableEnv.getConfig.setIdleStateRetentionTime(Time.minutes(5),
> >> Time.minutes(10));
> >> 我的作业逻辑是:统计每个userId每天第一次出现的记录,类似:select userId, first_value(xxx) from
> >> source group by userId, date_format(eventtime, '-MM-dd');
> >
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 

Best,
Benchao Li


??????????????Flink????FINISHED??????????????Checkpoint??????

2020-08-14 Thread ????
??
redisredis??State




--  --
??: 
   "user-zh"



Re: flink 1.11 SQL idea调试无数据也无报错

2020-08-14 Thread DanielGu
hi,

之前查看邮件列表确实有看到很多地方提到executeSql是一个异步接口.但是我对这部分还是有一些疑惑


1.当inset into 的逻辑是简单逻辑的时候可以看到代码有输出,但替换为我最初发的有聚合逻辑的insert into sql
就无法显示输出了,为什么?
代码
...
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

tEnv.executeSql("INSERT INTO print_sink SELECT  user_id
,item_id,category_id ,behavior ,ts,proctime FROM user_behavior");
...
控制台
3>
+I(1014646,2869046,4022701,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(105950,191177,3975787,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3>
+I(128322,5013356,4066962,buy,2017-11-27T00:38:15,2020-08-14T08:20:23.847)
3> +I(225652,3487948,2462567,pv,2017-11-27T00:38:15,2020-08-14T08:20:23.847)

聚合逻辑代码(source不变,sink 对应变更列)
> String transformationDDL= "INSERT INTO buy_cnt_per_hour\n" +
> "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)) as
> hour_of_day , COUNT(*) as buy_cnt\n" +
> "FROM user_behavior\n" +
> "WHERE behavior = 'buy'\n" +
> "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)";
>
>
>
> //注册source和sink
> tEnv.executeSql(sourceDDL);
> tEnv.executeSql(sinkDDL);
> //tableResult.print();
>
>tEnv.executeSql(transformationDDL);

2.没有太理解您说的   手动拿到那个executeSql的返回的TableResult,然后去   wait job finished
代码修改为如下 运行控制台还是没有结果打印
//注册source和sink
tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

TableResult tableResult = tEnv.executeSql(transformationDDL);

tableResult.getJobClient()
.get()
   
.getJobExecutionResult(Thread.currentThread().getContextClassLoader())
.get().wait();

Best,
DanielGu



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: TableColumn为啥不包含comment

2020-08-14 Thread Shengkai Fang
hi, 我已经建了一个issue[1]跟踪这个情况,有兴趣的话可以帮忙修复下这个bug。

[1] https://issues.apache.org/jira/browse/FLINK-18958

Harold.Miao  于2020年8月13日周四 上午11:08写道:

> hi all
> 我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢
>
>
> --
>
> Best Regards,
> Harold Miao
>


Re: 如何设置FlinkSQL并行度

2020-08-14 Thread 赵一旦
有结论了。貌似通过sql-client就是不支持。需要通过java/scala写代码方式,基于tableEnv提交sql执行,这种情况下只需要设置好env的检查点即可。
同时本身这种情况执行也是使用flink命令提交的任务,自然也可以基于flink触发保存点,或启动任务且基于检查点。

赵一旦  于2020年8月14日周五 下午12:03写道:

> 检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
>
> Xingbo Huang  于2020年8月14日周五 下午12:01写道:
>
>> Hi,
>>
>> 关于并行度的问题,据我所知,目前Table API上还没法对每一个算子单独设置并行度
>>
>> Best,
>> Xingbo
>>
>> Zhao,Yi(SEC)  于2020年8月14日周五 上午10:49写道:
>>
>> > 并行度问题有人帮忙解答下吗,此外补充个相关问题,除了并行度,flink-sql情况下,能做检查点/保存点,并基于检查点/保存点重启sql任务吗。
>> >
>> > 发件人: "Zhao,Yi(SEC)" 
>> > 日期: 2020年8月13日 星期四 上午11:44
>> > 收件人: "user-zh@flink.apache.org" 
>> > 主题: 如何设置FlinkSQL并行度
>> >
>> > 看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
>> > 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?
>> >
>> > 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。
>> >
>> >
>>
>


Re: 用hive streaming写 orc文件的问题

2020-08-14 Thread JasonLee
hi

我这边测试了ORC的,只需要把stored as pauquet 改成stored as
orc即可,success文件能生成,hive里面也能查看数据,但是有一个问题是,Flink Web UI上面显示的数据量是不对的 UI
上面的records send 一直在增大 即使我已经停止向kafka写入数据了 但是hive 里面的数据是对的 我写了30条
hive里面查出来的确实是30条 但UI上面已经显示480条了 且还在增加



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-14 Thread flink小猪
根据我在IDE上面的测试,如果是写入parquet表的话,不添加您发的这段代码,程序依然在运行,并且每间隔checkpoint-interval的时间
会打印parquet相关的日志,但是如果是写入orc表的话,则没有任何日志输出,程序依然在运行。另外我通过sql client提交相同的任务,
parquet表依然没有任何问题,而orc表任务无限重启。并报错。

java.io.FileNotFoundException: File does not exist: 
hdfs://nspt-cs/hive/warehouse/hive_user_orc/ts_dt=2020-08-14/ts_hour=17/ts_minute=55/.part-650c3d36-328a-4d8d-8bdd-c170109edfba-0-0.inprogress.398158d9-eaf7-4863-855e-238c7069e298
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309)
 ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
 ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
 ~[flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:2.7.5-10.0]
at 
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:54)
 ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
 ~[flink-connector-hive_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at StreamExecCalc$21.processElement(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
 ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at StreamExecCalc$4.processElement(Unknown Source) ~[?:?]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
   

回复: flink 1.11 发布sql任务到yarn session报java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.util.ByteStringer

2020-08-14 Thread wind.fly....@outlook.com
终于找到了问题,原因是flink-dist-*.jar包中打入了高版本的protobuf-java(3.7.1),高版本的protobuf-java中LiteralByteString是ByteString的私有内部类:

private static class LiteralByteString extends ByteString.LeafByteString {
  private static final long serialVersionUID = 1L;

  protected final byte[] bytes;

  /**
   * Creates a {@code LiteralByteString} backed by the given array, without 
copying.
   *
   * @param bytes array to wrap
   */
  LiteralByteString(byte[] bytes) {
if (bytes == null) {
  throw new NullPointerException();
}
this.bytes = bytes;
  }

而HBase Connector(1.4.3) 
读取数据过程中初始化org.apache.hadoop.hbase.util.ByteStringer时调用了new 
LiteralByteString(),这样就无法找到该类,从而报了java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hbase.util.ByteStringer。

解决方法:flink打包时去掉了protobuf-java(3.7.1)依赖,提交时将protobuf-java:2.5.0作为依赖即可。

发件人: wind.fly@outlook.com 
发送时间: 2020年8月13日 10:09
收件人: user-zh@flink.apache.org 
主题: flink 1.11 发布sql任务到yarn session报java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hbase.util.ByteStringer

Hi, all:
 
本人试图将flink-sql-gateway(https://github.com/ververica/flink-sql-gateway)升级到1.11支持版本,将flink
 sql(用到hbase connector)提交到yarn session后运行时报:
org.apache.hadoop.hbase.DoNotRetryIOException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:248)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:221)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:388)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:362)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:142)
at 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1053)
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:496)
at 
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:402)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:274)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
... 7 more

经过搜索怀疑可能是因为hbase-protobuf依赖的protobuf-java版本不对,但我怎么查看运行时jm、tm对应的classpath是什么样的,依赖了什么样的jar,希望给出分析思路或方法,谢谢。


flink1.9.1用采用-d(分离模式提交)作业报错,但是不加-d是可以正常跑的

2020-08-14 Thread bradyMk
请问大家:
我采用如下命令提交:
flink run \
-m yarn-cluster \
-yn 3 \
-ys 3 \
-yjm 2048m \
-ytm 2048m \
-ynm flink_test \
-d \
-c net.realtime.app.FlinkTest ./hotmall-flink.jar
就会失败,报错信息如下:
[AMRM Callback Handler Thread] ERROR
org.apache.flink.yarn.YarnResourceManager - Fatal error occurred in
ResourceManager.
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
[AMRM Callback Handler Thread] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred
in the cluster entrypoint.
java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.newInstance(IFLjava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/hadoop/yarn/api/records/ResourceBlacklistRequest;)Lorg/apache/hadoop/yarn/api/protocolrecords/AllocateRequest;
at
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:279)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:273)
[flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.yarn.YarnResourceManager - ResourceManager
akka.tcp://flink@emr-worker-8.cluster-174460:33650/user/resourcemanager was
granted leadership with fencing token 
[BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer -
Stopped BLOB server at 0.0.0.0:36247
 
但是我在提交命令时,不加-d,就可以正常提交运行;更奇怪的是,我运行另一个任务,加了-d参数,可以正常提交。
我这个提交失败的任务开始是用如下命令运行的:
nohup flink run \
-m yarn-cluster \
-yn 3 \
-ys 3 \
-yjm 2048m \
-ytm 2048m \
-ynm flink_test \
-c net.realtime.app.FlinkTest ./hotmall-flink.jar > /logs/flink.log 2>&1 &
 > /logs/nohup.out 2>&1 &

在这个任务挂掉之后,再用-d的方式重启就会出现我开始说的问题,很奇怪,有大佬知道为什么么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkSQL tableEnv 依赖问题

2020-08-14 Thread 赵一旦
代码如下:
// tEnv;
tEnv.sqlUpdate("create table dr1(  " +
"  cid STRING,  " +
"  server_time BIGINT,  " +
"  d MAP,  " +
"  process_time AS PROCTIME(),  " +
"  event_time AS TO_TIMESTAMP(FROM_UNIXTIME(server_time / 1000)),  " +
"  WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND  " +
") WITH (  " +
"  'update-mode' = 'append',  " +
"  'connector.type' = 'kafka',  " +
"  'connector.version' = 'universal',  " +
"  'connector.topic' = 'antibot_dr1',  " +
"  'connector.startup-mode' = 'latest-offset',  " +
"  'connector.properties.zookeeper.connect' =
'yq01-sw-xxx03.yq01:8681',  " +
"  'connector.properties.bootstrap.servers' =
'yq01-sw-xxx03.yq01:8192',  " +
"  'format.type' = 'json'  " +
")");
Table t1 = tEnv.sqlQuery("select * from dr1");

我打包会把flink-json打包进去,最终结果包是test.jar。

test.jar是个fat jar,相关依赖都有了。

然后我执行:flink run -c test.SQLWC1 --detached  test.jar 报错:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

可是我flink-json.jar都打包进去了,居然还是报错。。。

解决方式,必须是执行 flink run -c test.SQLWC1 --detached  test.jar 这个命令的机器

上的flink的环境中有flink-json这个包。但实际上这个机器只作为提交,实际执行任务的集群是另一个机器。

搞不懂,FlinkSQL找依赖的过程到底啥情况,我fat jar打包进去的flink-json不会被考虑吗?


Re:Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-14 Thread kandy.wang
@Jingsong  orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景

在 2020-08-12 16:04:13,"Jingsong Li"  写道:
>另外问一下,是什么格式?csv还是parquet。
>有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?
>
>On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
>> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
>> 就是感觉停止之前正在写的那个分区,没有触发commit
>>
>>
>>
>>
>> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
>> >那你之前的分区除了in-progress文件,有已完成的文件吗?
>> >
>> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
>> >
>> >>
>> >>
>> >>
>> >> source就是kafka
>> >>
>> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >>
>> >>
>> >>
>> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >>
>> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>> >> >你的source是exactly-once的source吗?
>> >> >
>> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >> >
>> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>> >> >
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> >@ Jingsong
>> >> >>
>> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> >> >> 用presto查询查不了
>> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>> >> >>  'sink.partition-commit.trigger'='process-time',
>> >> >>   'sink.partition-commit.delay'='0 min',
>> >> >>
>>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >> >>   'sink.rolling-policy.check-interval'='30s',
>> >> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >> >>   'sink.rolling-policy.file-size'='128MB'
>> >> >>如果是12:39分 05秒左右做一次savepoint,然后
>> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> >> >> partition,就导致有数据,但是确查不 了。
>> >> >>
>> >>
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> >> >> partition 也能查了。
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >> >> >>
>> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >> >> >>
>> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >> >> >>>
>> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
>> wrote:
>> >> >> >>>
>> >> >> >>> > 1.StreamingFileWriter
>> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >> >> >>> >
>> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >> >> >>> >
>> >> >> >>> >
>> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >> >> >>>
>> >> >> >>
>> >> >> >>
>> >> >> >>--
>> >> >> >>Best, Jingsong Lee
>> >> >>
>> >> >
>> >> >
>> >> >--
>> >> >Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-14 Thread JasonLee
hi

没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-14 Thread JasonLee
hi

参数是这么写的没错 'scan.startup.mode' = 'earliest-offset' 你确定你是用的新的groupid吗
我这里测试是可以的从头开始消费的 不知道是不是你测试的方法不对



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-14 Thread Jark Wu
 PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。
 see more:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


Best,
Jark


On Thu, 13 Aug 2020 at 14:27, xiao cai  wrote:

> Hi All:
> 使用flink-sql写入hbase sink时报错:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
>
>
> 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> sql如下:
> create table user_click_source(
> `id` bigint,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint,
> `catalog_id` int,
> `device_id` int,
> `user_id` int,
> `proc_time` timestamp(3)
> PRIMARY KEY (id) NOT ENFORCED
> )with(
> 'connector.type' = 'kafka',
> ……
> )
> ;
> create table dim_user(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >,
> ts bigint
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_device(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_catalog(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> create table hbase_full_user_click_case1_sink(
> `rowkey` bigint,
> cf ROW<
> `click_id` bigint,
> `click_name` varchar,
> `click_partition` int,
> `click_event_time` bigint,
> `click_write_time` bigint,
> `click_snapshot_time` bigint,
> `click_max_snapshot_time` bigint,
> `catalog_id` int,
> `catalog_name` varchar,
> `catalog_partition` int,
> `catalog_event_time` bigint,
> `catalog_write_time` bigint,
> `catalog_snapshot_time` bigint,
> `catalog_max_snapshot_time` bigint,
> `device_id` int,
> `device_name` varchar,
> `device_partition` int,
> `device_event_time` bigint,
> `device_write_time` bigint,
> `device_snapshot_time` bigint,
> `device_max_snapshot_time` bigint,
> `user_id` int,
> `user_name` varchar,
> `user_partition` int,
> `user_event_time` bigint,
> `user_write_time` bigint,
> `user_snapshot_time` bigint,
> `user_max_snapshot_time` bigint
> >,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> insert into hbase_full_user_click_case1_sink
> select
> `click_id`,
> ROW(
> `click_id`,
> `click_name`,
> `click_partition`,
> `click_event_time`,
> `click_write_time`,
> `click_snapshot_time`,
> `click_max_snapshot_time`,
> `catalog_id`,
> `catalog_name`,
> `catalog_partition`,
> `catalog_event_time`,
> `catalog_write_time`,
> `catalog_snapshot_time`,
> `catalog_max_snapshot_time`,
> `device_id`,
> `device_name`,
> `device_partition`,
> `device_event_time`,
> `device_write_time`,
> `device_snapshot_time`,
> `device_max_snapshot_time`,
> `user_id`,
> `user_name`,
> `user_partition`,
> `user_event_time`,
> `user_write_time`,
> `user_snapshot_time`,
> `user_max_snapshot_time`
> )
> from (select
> click.id as `click_id`,
> click.name as `click_name`,
> click.kafka_partition as `click_partition`,
> click.event_time as `click_event_time`,
> click.write_time as `click_write_time`,
> click.snapshot_time as `click_snapshot_time`,
> click.max_snapshot_time as `click_max_snapshot_time`,
> cat.cf.id as `catalog_id`,
> cat.cf.name as `catalog_name`,
> cat.cf.kafka_partition as `catalog_partition`,
> cat.cf.event_time as `catalog_event_time`,
> cat.cf.write_time as `catalog_write_time`,
> cat.cf.snapshot_time as `catalog_snapshot_time`,
> cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> dev.cf.id as `device_id`,
> dev.cf.name as `device_name`,
> dev.cf.kafka_partition as `device_partition`,
> dev.cf.event_time as `device_event_time`,
> dev.cf.write_time as `device_write_time`,
> dev.cf.snapshot_time as `device_snapshot_time`,
> dev.cf.max_snapshot_time as `device_max_snapshot_time`,
> u.cf.id as `user_id`,
> u.cf.name as `user_name`,
> u.cf.kafka_partition as `user_partition`,
> u.cf.event_time as `user_event_time`,
> u.cf.write_time as `user_write_time`,
> u.cf.snapshot_time as `user_snapshot_time`,
> u.cf.max_snapshot_time as `user_max_snapshot_time`
>
>
> from (select
> id,
> `name`,
> `kafka_partition`,
> `event_time`,
> `write_time`,
> `snapshot_time`,
> `max_snapshot_time`,
> cast(catalog_id as varchar) as catalog_key,
> cast(device_id as varchar) as device_key,
> cast(user_id as varchar) as user_key,
> `catalog_id`,
> `device_id`,
> `user_id`,
> `proc_time`,
> `event_time`,
> FROM user_click_source
> GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
> `id`,
> `name`,
> `kafka_partition`,
> `event_time`,

答复: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-14 Thread xiao cai
Hi Jark:
感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as 
of语句,添加后就不会再报这个错了。
另外有个问题想请教:1.11中新版hbase 
connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的?
再次感谢。


Best
Xiao Cai

发送自 Windows 10 版邮件应用

发件人: Jark Wu
发送时间: 2020年8月14日 23:23
收件人: user-zh
主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full 
primary keys

 PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。
 see more:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


Best,
Jark


On Thu, 13 Aug 2020 at 14:27, xiao cai  wrote:

> Hi All:
> 使用flink-sql写入hbase sink时报错:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
>
>
> 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> sql如下:
> create table user_click_source(
> `id` bigint,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint,
> `catalog_id` int,
> `device_id` int,
> `user_id` int,
> `proc_time` timestamp(3)
> PRIMARY KEY (id) NOT ENFORCED
> )with(
> 'connector.type' = 'kafka',
> ……
> )
> ;
> create table dim_user(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >,
> ts bigint
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_device(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_catalog(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> create table hbase_full_user_click_case1_sink(
> `rowkey` bigint,
> cf ROW<
> `click_id` bigint,
> `click_name` varchar,
> `click_partition` int,
> `click_event_time` bigint,
> `click_write_time` bigint,
> `click_snapshot_time` bigint,
> `click_max_snapshot_time` bigint,
> `catalog_id` int,
> `catalog_name` varchar,
> `catalog_partition` int,
> `catalog_event_time` bigint,
> `catalog_write_time` bigint,
> `catalog_snapshot_time` bigint,
> `catalog_max_snapshot_time` bigint,
> `device_id` int,
> `device_name` varchar,
> `device_partition` int,
> `device_event_time` bigint,
> `device_write_time` bigint,
> `device_snapshot_time` bigint,
> `device_max_snapshot_time` bigint,
> `user_id` int,
> `user_name` varchar,
> `user_partition` int,
> `user_event_time` bigint,
> `user_write_time` bigint,
> `user_snapshot_time` bigint,
> `user_max_snapshot_time` bigint
> >,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> insert into hbase_full_user_click_case1_sink
> select
> `click_id`,
> ROW(
> `click_id`,
> `click_name`,
> `click_partition`,
> `click_event_time`,
> `click_write_time`,
> `click_snapshot_time`,
> `click_max_snapshot_time`,
> `catalog_id`,
> `catalog_name`,
> `catalog_partition`,
> `catalog_event_time`,
> `catalog_write_time`,
> `catalog_snapshot_time`,
> `catalog_max_snapshot_time`,
> `device_id`,
> `device_name`,
> `device_partition`,
> `device_event_time`,
> `device_write_time`,
> `device_snapshot_time`,
> `device_max_snapshot_time`,
> `user_id`,
> `user_name`,
> `user_partition`,
> `user_event_time`,
> `user_write_time`,
> `user_snapshot_time`,
> `user_max_snapshot_time`
> )
> from (select
> click.id as `click_id`,
> click.name as `click_name`,
> click.kafka_partition as `click_partition`,
> click.event_time as `click_event_time`,
> click.write_time as `click_write_time`,
> click.snapshot_time as `click_snapshot_time`,
> click.max_snapshot_time as `click_max_snapshot_time`,
> cat.cf.id as `catalog_id`,
> cat.cf.name as `catalog_name`,
> cat.cf.kafka_partition as `catalog_partition`,
> cat.cf.event_time as `catalog_event_time`,
> cat.cf.write_time as `catalog_write_time`,
> cat.cf.snapshot_time as `catalog_snapshot_time`,
> cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> dev.cf.id as `device_id`,
> dev.cf.name as `device_name`,
> dev.cf.kafka_partition as `device_partition`,
> dev.cf.event_time as `device_event_time`,
> dev.cf.write_time as `device_write_time`,
> dev.cf.snapshot_time as `device_snapshot_time`,
> dev.cf.max_snapshot_time as `device_max_snapshot_time`,
> u.cf.id as `user_id`,
> u.cf.name as `user_name`,
> u.cf.kafka_partition as `user_partition`,
> u.cf.event_time as `user_event_time`,
> u.cf.write_time as `user_write_time`,
> u.cf.snapshot_time as `user_snapshot_time`,
> u.cf.max_snapshot_time as `user_max_snapshot_time`
>
>
> from (select
> id,
> `name`,
> `kafka_partition`,
> `event_time`,
>

Flink参数配置设置不生效

2020-08-14 Thread 魏烽
各位大佬好:

在flink-conf.yaml中设置参数execution.attached: false

   但是yarn logs查看此参数设置并没有生效,

   2020-08-15 09:40:13,489 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: execution.attached, true

   而且根据官网说明此参数默认应该是false才对,已确认在代码中并没有对此参数进行设置,请问这是什么情况呀?


Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-14 Thread Benchao Li
按理说这个模式即使是老的group id,也会忽略在kafka中保存的offset的。
你是从checkpoint恢复的任务么?从checkpoint恢复的offset的会覆盖这个配置。

JasonLee <17610775...@163.com> 于2020年8月14日周五 下午10:02写道:

> hi
>
> 参数是这么写的没错 'scan.startup.mode' = 'earliest-offset' 你确定你是用的新的groupid吗
> 我这里测试是可以的从头开始消费的 不知道是不是你测试的方法不对
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


通过Flink web ui上传jar包时无响应

2020-08-14 Thread Shiyuan L
通过Flink web ui上传jar包时链接被重置,不清楚是什么原因,请问各位大佬遇到过么?
[image: pic_2020-08-15_10-39-37.png]
[image: pic_2020-08-15_10-40-09.png]


flink state ttl状态清理和重新计算的疑问

2020-08-14 Thread sunfulin
hi,community,
想确认下idlestateretention的配置及生效机制,我有一个作业,设置了TTL为(10小时,10小时+5分钟)。假设我的作业是今天12:00启动,作业逻辑是统计当日10点后每个userId第一次登陆的时间:select
  userId, first_value(xxx) from source group by userId, date_format(eventtime, 
'-MM-dd')。那么我的作业的状态清理时机是从启动时间开始10小时之后么?还是会按照状态的数据更新的时间+10小时作为清理时间?
我使用flink 1.10.1版本,初步观察到的现象是,启动时间开始大概10小时后,状态开始清理。这个感觉不符合预期?求大佬帮忙确认下。