Re: flink sql cli 读取 hbase表报错

2020-06-28 Thread Leonard Xu
Hello,
这应该是一个已知bug[1],原因是Configuration是不可序列化的,HbaseRowInputFormat中没有正确处理,导致用户DDL中的zk配置无法传递。
在flink1.11和1.12上已经修复。如果是1.10.x版本中,可以将HBase 的配置文件(hbase-default.xml、 
hbase-site.xml) 添加到 classpath下,也可以把
HBase 
的配置文件添加到HADOOP_CLASSPATH(flnk启动脚本会检查HADOOP_CLASSPATH环境变量并加载),两种方式Flink集群和SQL 
Client都能加载到Hbase的配置文件,从而加载到配置文件中正确的zk信息。


祝好,
Leonard
[1]  https://issues.apache.org/jira/browse/FLINK-17968


> 在 2020年6月29日,11:45,王良  写道:
> 
> 您好:
> 
> 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表
> 
> CREATE TABLE dim_term (
>term_id string,
>info ROW(
>term_name string,
>term_name_combine string,
>term_notice string,
>term_remarks string,
>season string,
>term_sequence string,
>term_start_time string,
>term_end_time string,
>term_description string,
>term_status int,
>is_mvp_term int,
>ctime string,
>utime string
>)
> 
> ) WITH (
> 'connector.type' = 'hbase',
> 'connector.version' = '1.4.3',
> 'connector.table-name' = 'dim_term',
> 'connector.zookeeper.quorum' = 
> 'emr-header-1.cluster-109533:2181,emr-worker-1.cluster-109533:2181,emr-header-2.cluster-109533:2181',
> 'connector.zookeeper.znode.parent' = '/hbase'
> )
> 
> 遇到的问题是,当我在sql-client ,执行select * from dim_term 的时候报错
> 
> 2020-06-29 11:26:51,718 INFO  
> org.apache.flink.addons.hbase.HBaseRowInputFormat 
> org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:65)
>  - Initializing HBase configuration.
> 2020-06-29 11:26:51,831 INFO  
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.(RecoverableZooKeeper.java:120)
>  - Process identifier=hconnection-0x57b9485d connecting to ZooKeeper 
> ensemble=localhost:2181
> 
> 
> org.apache.flink.addons.hbase.HBaseRowInputFormat,这个类里面没有读取到zookeeper的配置



Re: 无法生成rowtime导致在window失败

2020-06-28 Thread Leonard Xu
Hi,

> field("logictime","TIMESTAMP(3)”)
 报错的原因这个字段在你原始的表中不存在的,理解你的需求是你想用 field evitime(Long型)生成一个新的 field 
logictime(TIMESTAMP(3)),这个可以用计算列解决,Table API上还不支持计算列,1.12 已经在开发中了。你可以用 DDL 
加计算列完成满足你的需求,参考[1]

create table test (
 acct STRING,
 evitime BIGINT,
 logictime as TO_TIMESTAMP(FROM_UNIXTIME(evitime)),
 WATERMARK FOR logictime AS logictime - INTERVAL ‘5’ SECOND,
) with(
...
)
 
   
祝好
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html
 


Re: flink sql row类型group by

2020-06-28 Thread Leonard Xu

> 在 2020年6月29日,12:05,sunfulin  写道:
> 
> 这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?

Hi,
在1.10.x 版本中,upsertSink 中推导 pk 是通过query 来推导,这个比较好的解决是等1.11发布后,通过在建表的DDL声明主键( 
PRIMARY KEY NOT ENFORCED), 如果要在1.10.x里解决,一般是改写下query,使得推导的pk能符合预期。这个写入es的sink要求 
pk 是简单类型,而你的query又需要ROW(c, d) 复合类型, 不太好改写。想到hack一点的方式就是把c,d 
拼接成一个字段c${delimeter}d,ROW(c, d)  用UDF构造,感觉这种也比较绕。如果业务上不是强需求ROW(c, 
d),又等不及1.11的话,可以在ES里多加一列就好了。

祝好,
Leonard Xu

Re:Re: flink sql row类型group by

2020-06-28 Thread sunfulin






hi, Leonard
这个写法应该是OK,不过我的场景下是下面这种
select a, b, row(commentId, commentContent) from T
group by a, b, commentId, commentContent
这种情况下推导PK貌似会报错(UpsertStreamTableSink requires full primary keys)。这种应该怎么写哈?











在 2020-06-29 10:19:31,"Leonard Xu"  写道:
>Hi,
>异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
>ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
>UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
>字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
>,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。
>
>你可以试下下面的query,query keys  对应es中的 id  就是 
>commentId${keyDelimiter}commentContent, 这也应该是你需要的结果 
>Select ROW(commentId, commentContent) from T
>group by commentId, commentContent
>
>祝好,
>Leonard Xu
>
>> 在 2020年6月28日,22:33,sunfulin  写道:
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> hi, 
>> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
>> 
>> 
>> org.apache.flink.table.api.ValidationException: Only simple types that can 
>> be safely converted into a string representation can be used as keys. But 
>> was: Row(commentId: String, commentContent: String)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
>> at 
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> at 
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> at 
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
>> at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> at 
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
>> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
>> at 
>> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>>> Hi,
>>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>>> 能附上异常栈就更好啦。
>>> 
>>> sunfulin  于2020年6月25日周四 下午4:35写道:
>>> 
 Hi,
 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Best,
>>> Benchao Li


flink sql cli 读取 hbase表报错

2020-06-28 Thread 王良
您好:

 我使用的是flink 1.10 ,通过sql-client 创建了hbase 表

CREATE TABLE dim_term (
term_id string,
info ROW(
term_name string,
term_name_combine string,
term_notice string,
term_remarks string,
season string,
term_sequence string,
term_start_time string,
term_end_time string,
term_description string,
term_status int,
is_mvp_term int,
ctime string,
utime string
)

) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name' = 'dim_term',
'connector.zookeeper.quorum' = 
'emr-header-1.cluster-109533:2181,emr-worker-1.cluster-109533:2181,emr-header-2.cluster-109533:2181',
'connector.zookeeper.znode.parent' = '/hbase'
)

遇到的问题是,当我在sql-client ,执行select * from dim_term 的时候报错

2020-06-29 11:26:51,718 INFO  org.apache.flink.addons.hbase.HBaseRowInputFormat 

org.apache.flink.addons.hbase.HBaseRowInputFormat.configure(HBaseRowInputFormat.java:65)
 - Initializing HBase configuration.
2020-06-29 11:26:51,831 INFO  
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.(RecoverableZooKeeper.java:120)
 - Process identifier=hconnection-0x57b9485d connecting to ZooKeeper 
ensemble=localhost:2181


org.apache.flink.addons.hbase.HBaseRowInputFormat,这个类里面没有读取到zookeeper的配置
 

Re: [EXTERNAL] Re: Native K8S IAM Role?

2020-06-28 Thread Yang Wang
Using a webhook is really a good direction to support some unreleased Flink
native
k8s features. We are doing the same thing internally.


Best,
Yang

Bohinski, Kevin  于2020年6月29日周一 上午3:09写道:

> Hi Yang,
>
>
>
> Awesome, looking forward to 1.11!
>
> In the meantime, we are using a mutating web hook in case anyone else is
> facing this...
>
>
>
> Best,
>
> kevin
>
>
>
>
>
> *From: *Yang Wang 
> *Date: *Saturday, June 27, 2020 at 11:23 PM
> *To: *"Bohinski, Kevin" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *[EXTERNAL] Re: Native K8S IAM Role?
>
>
>
> Hi kevin,
>
>
>
> If you mean to add annotations for Flink native K8s session pods, you
> could use "kubernetes.jobmanager.annotations"
>
> and "kubernetes.taskmanager.annotations"[1]. However, they are only
> supported from release-1.11. Maybe you could
>
> wait for a little bit more time, 1.11 will be released soon. And we add
> more features for native K8s integration in 1.11
>
> (e.g. application mode, label, annotation, toleration, etc.).
>
>
>
>
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes
> 
>
>
>
> Best,
>
> Yang
>
>
>
> Bohinski, Kevin  于2020年6月26日周五 上午3:09写道:
>
> Hi,
>
>
>
> How do we attach an IAM role to the native K8S sessions?
>
>
>
> Typically for our other pods we use the following in our yamls:
>
> spec:
>
>   template:
>
> metadata:
>
>   annotations:
>
> iam.amazonaws.com/role
> :
> ROLE_ARN
>
>
>
> Best
>
> kevin
>
>


flink读取kafka超时问题

2020-06-28 Thread 阿华田
Caused by: java.lang.Exception: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dercd_seeme-3 could be determined  
大佬们flink读取kafka遇到过这个错误没?现在情况是 
每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时?


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



无法生成rowtime导致在window失败

2020-06-28 Thread naturalfree
大家好
   在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下!
   
简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception
 in thread "main" org.apache.flink.table.api.ValidationException: A group 
window expects a time attribute for grouping in a stream environment.
 
   下边是我的详细流程的相关片段

1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11
2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819}
3. 使用descriptor的方式连接kafka,代码为:
StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);


fsTableEnv.connect(new Kafka()
  .version("universal")
  .topic("jes_topic_evtime")
  .property("zookeeper.connect", 
"172.xx.xx.xxx:2181")
  .property("bootstrap.servers", 
"172.xx.xx.xxx:9092")
  .property("group.id", "grp1")
  .startFromEarliest()
).withFormat(new Json()
  
.failOnMissingField(false).deriveSchema())
   .withSchema(new 
Schema().field("acct", "STRING").field("evtime", 
"LONG").field("logictime","TIMESTAMP(3)").rowTime(new 
Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000)))
 
.inAppendMode().createTemporaryTable("testTableName");


   Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime 
FROM testTableName")
   
.window(Tumble.over("5.seconds").on("logictime").as("w1"))
   .groupBy("w1, acct")
   .select("w1.rowtime, acctno");




测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null

Re: flink sql row类型group by

2020-06-28 Thread Leonard Xu
Hi,
异常信息很有用,group by ROW 在 Flink SQL 里是支持的,只是在 ElasticSearchUpsertSink 的时候不支持,原因是 
ElasticSearchUpsertSink 需要将 group by 的 keys 字段转换成 String 用于构造 
UpdateRequest、DeleteRequest 对象,更进一步的原因是 keys 字段转换成的 String 对应了 Es 中的 id 
字段(String 类型)。So, 当前ElasticSearchUpsertSink 的实现中,只支持将简单类型作为 keys 
,复杂类型不支持,复杂类型toString()的结果可能不是我们想要的。

你可以试下下面的query,query keys  对应es中的 id  就是 commentId${keyDelimiter}commentContent, 
这也应该是你需要的结果 
Select ROW(commentId, commentContent) from T
group by commentId, commentContent

祝好,
Leonard Xu

> 在 2020年6月28日,22:33,sunfulin  写道:
> 
> 
> 
> 
> 
> 
> 
> hi, 
> 谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner
> 
> 
> org.apache.flink.table.api.ValidationException: Only simple types that can be 
> safely converted into a string representation can be used as keys. But was: 
> Row(commentId: String, commentContent: String)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at 
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
> at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
> at 
> com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-28 10:15:34,"Benchao Li"  写道:
>> Hi,
>> 我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>> 能附上异常栈就更好啦。
>> 
>> sunfulin  于2020年6月25日周四 下午4:35写道:
>> 
>>> Hi,
>>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>> 
>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li



Re: Optimal Flink configuration for Standalone cluster.

2020-06-28 Thread Xintong Song
>
> Since changing off-heap removes memory from '.task.heap.size' is there a
> ratio that I should follow for better performance?
>
I don't think so. It could really be specific to your workload. Some
workload may need more heap memory while others may need more off-heap.

Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct?
>
In most cases, yes. But it is also possible the other way around. Larger
`.flink.size` usually also means larger JVM heap space, which reduces the
frequency of GCs but increases the time cost on each GC (espeacially full
GCs). On the other hand, if the memory is large enough, it could become the
CPU resource rather than the memory that limits the performance. In such
cases, increasing memory size won't give you more performance improvement
but might introduce more GC overheads, thus harm the overall performance.

In this particular cluster, since every Machine has 252 total DRAM and
> worst case scenario 180GB is free to use, should I just say .flink.size:
> 180g?
>
Not sure about this. I would suggest to avoid large task managers (say tens
of GBs) unless absolutely necessary. Alternatively, you can try to launch
multiple TMs on one physical machine, to reduce the memory size of each TM
process.

BTW, what kind of workload are you running? Is it streaming or batch?


Thank you~

Xintong Song



On Mon, Jun 29, 2020 at 1:18 AM Dimitris Vogiatzidakis <
dimitrisvogiatzida...@gmail.com> wrote:

> Hi Xintong,
> Thank you for the quick response.
> doing 1), without increasing  'task.off-heap.size'  does not change the
> issue, but increasing the off-heap alone does.
> What should the off-heap value size be? Since changing off-heap removes
> memory from '.task.heap.size' is there a ratio that I should follow for
> better performance?
> Also, my guess (since I am dealing with big datasets) is that the more
> '.flink.size' I provide the better. Is that correct? Or will it add extra
> 'overhead' that could slow down my computations? In this particular
> cluster, since every Machine has 252 total DRAM and worst case scenario
> 180GB is free to use, should I just say .flink.size: 180g?
>
> Thank you very much and sorry if i'm asking silly questions.
> Dimitris Vogiatzidakis
>
> On Sun, Jun 28, 2020 at 5:25 AM Xintong Song 
> wrote:
>
>> Hi Dimitris,
>>
>> Regarding your questions.
>> a) For standalone clusters, the recommended way is to use `.flink.size`
>> rather than `.process.size`. `.process.size` includes JVM metaspace and
>> overhead in addition to `.flink.size`, which usually do not really matter
>> for standalone clusters.
>> b) In case of direct OOMs, you should increase
>> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
>> c) Your understanding is correct. And you can also specify the absolute
>> network memory size by setting the min and max to the same value.
>>
>> Here are my suggestions according to what you described.
>>
>>1. Since both off-heap and network memory seems insufficient, I would
>>suggest to increase `taskmanager.memory.flink.size` to give your task
>>managers more memory in total.
>>2. If 1) does not work, I would suggest not to set the total memory
>>(means configure neither `.flink.size` nor `process.size`), but go for the
>>fine grained configuration where explicitly specify the individual memory
>>components. Flink will automatically add them up to derive the total 
>> memory.
>>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>>   you will also need to set `.task.heap.size` and `managed.size`.
>>   2. If you don't know how many heap/managed memory to configure,
>>   you can look for the configuration options in the beginning of the TM 
>> logs
>>   (`-Dkey=value`). Those are the values derived from your current
>>   configuration.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
>> dimitrisvogiatzida...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I'm having a bit of trouble understanding the memory configuration on
>>> flink.
>>> I'm using flink10.0.0 to read some datasets of edges and extract
>>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>>> 252GB Ram each, and hopefully I could expand this as long as I can add
>>> extra nodes to the cluster.
>>>
>>> So regarding the configuration file (flink-conf.yaml).
>>> a) I can't understand when should I use process.size and when
>>> .flink.size.
>>>
>>> b) From the detailed memory model I understand that Direct memory is
>>> included in both of flink and process size, however if I don't specify
>>> off-heap.task.size I get
>>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>>> off-heap.fraction as well?
>>>
>>> c)When I fix this, I get network buffers error, which if I understand
>>> correctly,  flink.size * network fraction , should be between min 

Re: flink sql 中值为null时结果都为 false

2020-06-28 Thread Leonard Xu
Hello
更新下,社区这个 issue(FLINK-18164 
)和 Benchao 
讨论后关闭了,因为当前Flink在处理 null 的行为是正确的,所以建议处理 null 时,都用 IS NULL , IS NOT NULL 
先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。

祝好,
Leonard Xu

> 在 2020年6月7日,17:22,Leonard Xu  写道:
> 
> Hi, 
> Flink 用Calcite做sql解析和优化, 这是个 bool 的二值逻辑和三值逻辑处理问题,calcite默认在 where clause[2] 
> 处理时 是用UNKNOWN_AS_FALSE mode, 这个结果是符合预期的, 类似的还有"x IS TRUE","JOIN ... ON x", 
> "HAVING x。
> 
> [1] 
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L1016
>  
> 
> [2] 
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java#L3462
>  
> 
> 
> Best,
> Leonard Xu
> 
>> 在 2020年6月6日,11:45,Benchao Li > > 写道:
>> 
>> 哇,非常赞!
>> 我也查了一下,在SQL标准里,bool表达式是有三种value的[1],分别是true、false、unknown。
>> 而且null正常来讲是跟任何value都不相等的,包括另外一个null [2]。
>> 
>> 所以如果执行`SELECT null <>
>> null`,返回结果应该unknown,在flink里,这个应该就是null,而不是true,或者false。
>> 而如果在WHERE条件中出现这种情况的时候,比较的结果应该也是unknown[3],但是默认处理是按照false来处理的。
>> 
>> 而`IS [NOT] DISTINCT FROM`就是专门用来处理对null值的比较的场景的。因为它可以处理null,所以它的返回值
>> 只会是true或者false,而不会是unknown。对于你这个场景来说,应该是最合适的。
>> 
>> PS:回复邮件的时候,记得“回复全部”,这样我们的讨论社区里的小伙伴们都可以看到并且受益~
>> 
>> [1] https://modern-sql.com/concept/three-valued-logic 
>> 
>> [2] https://modern-sql.com/feature/is-distinct-from 
>> 
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html#comparison-functions
>>  
>> 
>> 
>> whirly  于2020年6月6日周六 上午10:42写道:
>> 
>>> Hi.
>>> 我刚刚找到了解决方法了,flink sql builtin functions 中其实提供了另外的逻辑运算符 IS DISTINCT FROM
>>> 可以解决这个问题,
>>> IS DISTINCT FROM 也是不等于,相对于 <> ,Null IS DISTINCT FROM someValue 的结果是 True。
>>> 
>>> best,
>>> whirly
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-06-06 00:59:12,"Benchao Li"  写道:
>>> 
>>> Hi,
>>> 
>>> 我又想了一下这个问题,我认为这个行为的确是有点不太合理,我建了一个issue[1] 来跟踪这个事情。
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-18164
>>> 
>>> whirly  于2020年6月5日周五 下午11:20写道:
>>> 
 好的,可能只是我的邮件客户端显示的问题,感谢回复。
 
 关于多加一个 is not null的判断的问题,po和我都觉得有些多此一举的感觉,而且有时候条件里字段很多,每个字段之前都需要加一个 is
 not null,难度也很大,且容易出错。
 
 如果能有一个配置项控制 null <> 'someValue' 结果为true就好了
 
 
 
 
 whirly
 邮箱:whir...@163.com
 
 
 
 签名由 网易邮箱大师  定制
 
 在2020年06月05日 23:08,Benchao Li  写道:
 Hi,
 
 我这边只收到一封你第一次发的邮件,看起来应该是没有问题。(不知道是不是你邮箱客户端本地显示的问题)
 关于你的问题,现在的确是这样子处理的。我想问一下,多加一个IS NOT NULL有什么问题么?
 
 whirly  于2020年6月5日周五 下午9:54写道:
 
> 不好意思,刚刚发现163邮箱自动重发了好几次这个提问,不知道怎么回事,可能是邮箱bug? 实在打扰了,而且现在不知道是不是还会自动重发
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-05 14:25:10,"whirly"  写道:
>> 大家好:
>>   在 flink sql 中,如 SELECT * from order where  product <>
 'rubber',如果数据中的
> product 字段值是 null, 是无法匹配 product <> 'rubber' 这个条件的,虽然 null 确实不等于
 'rubber'
>>   只有将条件改为  where product is Null or product <> 'rubber' 才能匹配。
>>   但是我想要 null <> 'rubber' 的结果也为 True,且不想在条件之前加上 product is Null
> 的判断,可以怎么办呢?
>> 
>> 
>> 感谢
> 
 
 
 --
 
 Best,
 Benchao Li
 
 
>>> 
>>> --
>>> 
>>> Best,
>>> Benchao Li
>>> 
>>> 
>> 
>> -- 
>> 
>> Best,
>> Benchao Li
> 



Re: 【Flink SQL对于NULL在不等时候的处理】

2020-06-28 Thread Leonard Xu

> 社区之前有个issue[1]在跟进这个问题, 在此之前建议处理 null 时,都用 IS NULL , IS NOT NULL 
> 先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。
> 
> 祝好,
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-18164 
> 
> 

更新下,社区这个 issue(FLINK-18164 
)和 Benchao 
讨论后关闭了,因为当前Flink在处理 null 的行为是正确的,所以建议处理 null 时,都用 IS NULL , IS NOT NULL 
先做下过滤再做逻辑判断,一般SQL里面也是这样处理的。

祝好,
Leonard Xu

Re: Heartbeat of TaskManager timed out.

2020-06-28 Thread Xintong Song
In Flink 1.10, there's a huge change in the memory management compared to
previous versions. This could be related to your observations, because with
the same configurations, it is possible that there's less JVM heap space
(with more off-heap memory). Please take a look at this migration guide [1].

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html

On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski  wrote:

> Thanks for the suggestions!
>
> > i recently tried 1.10 and see this error frequently. and i dont have the
> same issue when running with 1.9.1
> I did downgrade to Flink 1.9 and there's certainly no change in the
> occurrences in the heartbeat timeout
>
>
> >
>
>- Probably the most straightforward way is to try increasing the
>timeout to see if that helps. You can leverage the configuration option
>`heartbeat.timeout`[1]. The default is 50s.
>- It might be helpful to share your configuration setups (e.g., the TM
>resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>share the beginning part of your JM/TM logs, including the JVM parameters
>and all the loaded configurations.
>- You may want to look into the GC logs in addition to the metrics. In
>case of a CMS GC stop-the-world, you may not be able to see the most recent
>metrics due to the process not responding to the metric querying services.
>- You may also look into the status of the JM process. If JM is under
>significant GC pressure, it could also happen that the heartbeat message
>from TM is not timely handled before the timeout check.
>- Is there any metrics monitoring the network condition between the JM
>and timeouted TM? Possibly any jitters?
>
>
> Weirdly enough, I did manage to find a problem with the timed out
> TaskManagers, which slipped away the last time I checked: The timed out
> TaskManager is always the one with the max. GC time (young generation). I
> see it only now that I run with G1GC, but with the previous GC it wasn't
> the case.
>
> Does anyone know what can cause high GC time and how to mitigate this?
>
> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song 
> wrote:
>
>> Hi Ori,
>>
>> Here are some suggestions from my side.
>>
>>- Probably the most straightforward way is to try increasing the
>>timeout to see if that helps. You can leverage the configuration option
>>`heartbeat.timeout`[1]. The default is 50s.
>>- It might be helpful to share your configuration setups (e.g., the
>>TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>share the beginning part of your JM/TM logs, including the JVM parameters
>>and all the loaded configurations.
>>- You may want to look into the GC logs in addition to the metrics.
>>In case of a CMS GC stop-the-world, you may not be able to see the most
>>recent metrics due to the process not responding to the metric querying
>>services.
>>- You may also look into the status of the JM process. If JM is under
>>significant GC pressure, it could also happen that the heartbeat message
>>from TM is not timely handled before the timeout check.
>>- Is there any metrics monitoring the network condition between the
>>JM and timeouted TM? Possibly any jitters?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>
>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski  wrote:
>>
>>> Hello,
>>>
>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions
>>> and I have parallelism of 189.
>>>
>>> Currently running with RocksDB, with checkpointing disabled. My state
>>> size is appx. 500gb.
>>>
>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no
>>> apparent reason.
>>>
>>> I check the container that gets the timeout for GC pauses, heap memory,
>>> direct memory, mapped memory, offheap memory, CPU load, network load, total
>>> out-records, total in-records, backpressure, and everything I can think of.
>>> But all those metrics show that there's nothing unusual, and it has around
>>> average values for all those metrics. There are a lot of other containers
>>> which score higher.
>>>
>>> All the metrics are very low because every TaskManager runs on a
>>> r5.2xlarge machine alone.
>>>
>>> I'm trying to debug this for days and I cannot find any explanation for
>>> it.
>>>
>>> Can someone explain why it's happening?
>>>
>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>>> container_1593074931633_0011_01_000127 timed out.
>>> at org.apache.flink.runtime.jobmaster.
>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>> .java:1147)
>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>> HeartbeatMonitorImpl.java:109)
>>> at 

Re: [EXTERNAL] Re: Native K8S IAM Role?

2020-06-28 Thread Bohinski, Kevin
Hi Yang,

Awesome, looking forward to 1.11!
In the meantime, we are using a mutating web hook in case anyone else is facing 
this...

Best,
kevin


From: Yang Wang 
Date: Saturday, June 27, 2020 at 11:23 PM
To: "Bohinski, Kevin" 
Cc: "user@flink.apache.org" 
Subject: [EXTERNAL] Re: Native K8S IAM Role?

Hi kevin,

If you mean to add annotations for Flink native K8s session pods, you could use 
"kubernetes.jobmanager.annotations"
and "kubernetes.taskmanager.annotations"[1]. However, they are only supported 
from release-1.11. Maybe you could
wait for a little bit more time, 1.11 will be released soon. And we add more 
features for native K8s integration in 1.11
(e.g. application mode, label, annotation, toleration, etc.).


[1]. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes

Best,
Yang

Bohinski, Kevin mailto:kevin_bohin...@comcast.com>> 
于2020年6月26日周五 上午3:09写道:
Hi,

How do we attach an IAM role to the native K8S sessions?

Typically for our other pods we use the following in our yamls:
spec:
  template:
metadata:
  annotations:

iam.amazonaws.com/role:
 ROLE_ARN

Best
kevin


Re: Running Apache Flink on the GraalVM as a Native Image

2020-06-28 Thread Stephen Connolly
On Sun 28 Jun 2020 at 01:34, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de 
> wrote:
>
>> Whats up guys,
>>
>>
>>
>> I'm trying to run an Apache Flink Application with the GraalVM Native
>> Image but I get the following error: (check attached file)
>>
>>
>>
>> I suppose this happens, because Flink uses a lot of low-level-code and is
>> highly optimized.
>>
>
> Actually I suspect the reason is that Flink uses dynamic classloading.
>
> GraalVM requires all the code available in order to produce a native image.
>
> You’d need to pre-bind the topology you want Flink to run into the native
> image.
>
> More fun, you’ll actually need two images, one for the job manager and one
> for the task manager.
>
> And you’ll need to convince GraalVM that the entry-point is your topology
> needs reflection support enabled... plus whatever other classes use
> reflection in Flink.
>
> Sounds rather complex to me. If native images are what is important to
> you, there seemed to be a strong contender in the Rust language community,
> didn’t provide as strong management as Flink, and you’d probably have more
> work managing things like checkpointing, but if native code is important
> that’s where I’d be looking. Sadly I cannot remember the name and my
> google-foo is weak tonight
>

I think it might have been
https://github.com/grippy/tempest but that looks less actively developed
than the one I thought I saw...

I’d also check out frameworks for Go even if I dislike Go... if you want
native code it’s either Rust or Go in my book


>
>>
>> When I googled the combination of GraalVM Native Image and Apache Flink I
>> get no results.
>>
>>
>>
>> Did anyone ever succeeded in making it work and how?
>>
>>
>>
>> Best regards,
>>
>>
>>
>> Ivo
>> 
>>
> --
> Sent from my phone
>
-- 
Sent from my phone


Re: Optimal Flink configuration for Standalone cluster.

2020-06-28 Thread Dimitris Vogiatzidakis
Hi Xintong,
Thank you for the quick response.
doing 1), without increasing  'task.off-heap.size'  does not change the
issue, but increasing the off-heap alone does.
What should the off-heap value size be? Since changing off-heap removes
memory from '.task.heap.size' is there a ratio that I should follow for
better performance?
Also, my guess (since I am dealing with big datasets) is that the more
'.flink.size' I provide the better. Is that correct? Or will it add extra
'overhead' that could slow down my computations? In this particular
cluster, since every Machine has 252 total DRAM and worst case scenario
180GB is free to use, should I just say .flink.size: 180g?

Thank you very much and sorry if i'm asking silly questions.
Dimitris Vogiatzidakis

On Sun, Jun 28, 2020 at 5:25 AM Xintong Song  wrote:

> Hi Dimitris,
>
> Regarding your questions.
> a) For standalone clusters, the recommended way is to use `.flink.size`
> rather than `.process.size`. `.process.size` includes JVM metaspace and
> overhead in addition to `.flink.size`, which usually do not really matter
> for standalone clusters.
> b) In case of direct OOMs, you should increase
> `taskmanager.memory.task.off-heap.size`. There's no fraction for that.
> c) Your understanding is correct. And you can also specify the absolute
> network memory size by setting the min and max to the same value.
>
> Here are my suggestions according to what you described.
>
>1. Since both off-heap and network memory seems insufficient, I would
>suggest to increase `taskmanager.memory.flink.size` to give your task
>managers more memory in total.
>2. If 1) does not work, I would suggest not to set the total memory
>(means configure neither `.flink.size` nor `process.size`), but go for the
>fine grained configuration where explicitly specify the individual memory
>components. Flink will automatically add them up to derive the total 
> memory.
>   1. In addition to `.task.off-heap.size` and `.network.[min|max]`,
>   you will also need to set `.task.heap.size` and `managed.size`.
>   2. If you don't know how many heap/managed memory to configure, you
>   can look for the configuration options in the beginning of the TM logs
>   (`-Dkey=value`). Those are the values derived from your current
>   configuration.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sat, Jun 27, 2020 at 10:56 PM Dimitris Vogiatzidakis <
> dimitrisvogiatzida...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm having a bit of trouble understanding the memory configuration on
>> flink.
>> I'm using flink10.0.0 to read some datasets of edges and extract
>> features. I run this on a cluster consisting of 4 nodes , with 32cores and
>> 252GB Ram each, and hopefully I could expand this as long as I can add
>> extra nodes to the cluster.
>>
>> So regarding the configuration file (flink-conf.yaml).
>> a) I can't understand when should I use process.size and when
>> .flink.size.
>>
>> b) From the detailed memory model I understand that Direct memory is
>> included in both of flink and process size, however if I don't specify
>> off-heap.task.size I get
>> " OutOfMemoryError: Direct buffer memory " .  Also should I change
>> off-heap.fraction as well?
>>
>> c)When I fix this, I get network buffers error, which if I understand
>> correctly,  flink.size * network fraction , should be between min and max.
>>
>> I can't find the 'perfect' configuration regarding my setup. What is the
>> optimal way to use the system I have currently?
>>
>> Thank you for your time.
>>
>>
>>


?????? ??Flink Sql ????????????????????????????

2020-06-28 Thread ????????
Hi,


??c1,c2,c3??c4
alter tablec4c1,c4,c2,c3??
??.






----
??:"Jark Wu"

Re:Re: flink sql row类型group by

2020-06-28 Thread sunfulin






hi, 
谢谢本超的热心回复。忘了贴异常了。使用flink 1.10.1 blink planner


org.apache.flink.table.api.ValidationException: Only simple types that can be 
safely converted into a string representation can be used as keys. But was: 
Row(commentId: String, commentContent: String)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.validateKeyTypes(ElasticsearchUpsertTableSinkBase.java:310)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.setKeyFields(ElasticsearchUpsertTableSinkBase.java:152)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:111)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:685)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:93)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.doJob(ArotaLiveZLCFTRealtimeJob.java:42)
at com.htsc.crm_realtime.fatjob.Jobs.JobEntryBase.run(JobEntryBase.java:65)
at 
com.htsc.crm_realtime.fatjob.Jobs.zhibo.ArotaLiveZLCFTRealtimeJob.main(ArotaLiveZLCFTRealtimeJob.java:47)















在 2020-06-28 10:15:34,"Benchao Li"  写道:
>Hi,
>我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
>能附上异常栈就更好啦。
>
>sunfulin  于2020年6月25日周四 下午4:35写道:
>
>> Hi,
>> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
>> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
>
>
>
>-- 
>
>Best,
>Benchao Li


Re: Heartbeat of TaskManager timed out.

2020-06-28 Thread Ori Popowski
Thanks for the suggestions!

> i recently tried 1.10 and see this error frequently. and i dont have the
same issue when running with 1.9.1
I did downgrade to Flink 1.9 and there's certainly no change in the
occurrences in the heartbeat timeout


>

   - Probably the most straightforward way is to try increasing the timeout
   to see if that helps. You can leverage the configuration option
   `heartbeat.timeout`[1]. The default is 50s.
   - It might be helpful to share your configuration setups (e.g., the TM
   resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
   share the beginning part of your JM/TM logs, including the JVM parameters
   and all the loaded configurations.
   - You may want to look into the GC logs in addition to the metrics. In
   case of a CMS GC stop-the-world, you may not be able to see the most recent
   metrics due to the process not responding to the metric querying services.
   - You may also look into the status of the JM process. If JM is under
   significant GC pressure, it could also happen that the heartbeat message
   from TM is not timely handled before the timeout check.
   - Is there any metrics monitoring the network condition between the JM
   and timeouted TM? Possibly any jitters?


Weirdly enough, I did manage to find a problem with the timed out
TaskManagers, which slipped away the last time I checked: The timed out
TaskManager is always the one with the max. GC time (young generation). I
see it only now that I run with G1GC, but with the previous GC it wasn't
the case.

Does anyone know what can cause high GC time and how to mitigate this?

On Sun, Jun 28, 2020 at 5:04 AM Xintong Song  wrote:

> Hi Ori,
>
> Here are some suggestions from my side.
>
>- Probably the most straightforward way is to try increasing the
>timeout to see if that helps. You can leverage the configuration option
>`heartbeat.timeout`[1]. The default is 50s.
>- It might be helpful to share your configuration setups (e.g., the TM
>resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>share the beginning part of your JM/TM logs, including the JVM parameters
>and all the loaded configurations.
>- You may want to look into the GC logs in addition to the metrics. In
>case of a CMS GC stop-the-world, you may not be able to see the most recent
>metrics due to the process not responding to the metric querying services.
>- You may also look into the status of the JM process. If JM is under
>significant GC pressure, it could also happen that the heartbeat message
>from TM is not timely handled before the timeout check.
>- Is there any metrics monitoring the network condition between the JM
>and timeouted TM? Possibly any jitters?
>
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>
> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski  wrote:
>
>> Hello,
>>
>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions
>> and I have parallelism of 189.
>>
>> Currently running with RocksDB, with checkpointing disabled. My state
>> size is appx. 500gb.
>>
>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no
>> apparent reason.
>>
>> I check the container that gets the timeout for GC pauses, heap memory,
>> direct memory, mapped memory, offheap memory, CPU load, network load, total
>> out-records, total in-records, backpressure, and everything I can think of.
>> But all those metrics show that there's nothing unusual, and it has around
>> average values for all those metrics. There are a lot of other containers
>> which score higher.
>>
>> All the metrics are very low because every TaskManager runs on a
>> r5.2xlarge machine alone.
>>
>> I'm trying to debug this for days and I cannot find any explanation for
>> it.
>>
>> Can someone explain why it's happening?
>>
>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>> container_1593074931633_0011_01_000127 timed out.
>> at org.apache.flink.runtime.jobmaster.
>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>> .java:1147)
>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>> HeartbeatMonitorImpl.java:109)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>> .java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>> AkkaRpcActor.java:397)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>> AkkaRpcActor.java:190)
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>> AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at 

?????? ??????????

2020-06-28 Thread cs

??yarnyarn??
yarn??standalone??standalone??flink??,
yarnspark??MR??





----
??:"LakeShen"

Re: 高可用集群

2020-06-28 Thread LakeShen
Hi 李军,

目前我们在 Yarn 上面的话,用的是 Flink On Yarn Per Job 模式,在 K8s 上面的话,就是 Standalone per
Job 模式。

Best,
LakeShen

刘佳炜  于2020年6月28日周日 下午5:14写道:

> 如果你公司用hadoop的话就是YARN StandAlone一般都是单机测试练习的
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 李军  发送时间: 2020年6月28日 17:11
> 收件人: user-zh  主题: 回复:高可用集群
>
>
>
> 
> 请教下,各位大佬们生产环境使用的是哪种集群配置
>  1.
> Standalone 集群
>  2. Yarn
> 集群
> 
>
>  理由是什么,不知道怎么选择
>
>
> 2020-6-28
> | |
> 李军
> |
> |
> hold_li...@163.com
> |
> 签名由网易邮箱大师定制


关于注册定时器的一些疑问

2020-06-28 Thread Jun Zhang
大家好:
 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream
接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。


Error reporting for Flink jobs

2020-06-28 Thread Satyam Shekhar
Hello,

I am using Flink as the query engine for running SQL queries on both batch
and streaming data. I use the Blink planner in batch and streaming mode
respectively for the two cases.

In my current setup, I execute the batch queries synchronously via
StreamTableEnvironment::execute method. The job uses OutputFormat to
consume results in StreamTableSink and send it to the user. In case there
is an error/exception in the pipeline (possibly to user code), it is not
reported to OutputFormat or the Sink. If an error occurs after the
invocation of the write method on OutputFormat, the implementation may
falsely assume that the result successful and complete since close is
called in both success and failure cases. I can work around this, by
checking for exceptions thrown by the execute method but that adds extra
latency due to job tear down cost.

A similar problem also exists for streaming jobs. In my setup, streaming
jobs are executed asynchronously via StreamExecuteEnvironment::executeAsync.
Since the sink interface has no methods to receive errors in the pipeline,
the user code has to periodically track and manage persistent failures.

Have I missed something in the API? Or Is there some other way to get
access to error status in user code?

Regards,
Satyam


回复:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 Thread 夏帅
你好,这个问题从异常来看是使用TupleTypeInfo导致的,可以试下使用GenericRecordAvroTypeInfo


--
发件人:yingbo yang 
发送时间:2020年6月28日(星期日) 17:38
收件人:user-zh 
主 题:flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

Hi:
在使用 ParquetAvroWriters.forGenericRecord(Schema schema)
写parquet文件的时候 出现 类转化异常:
下面是我的代码:

// //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(GenericData.Record.class,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
 DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo);
 testDataStream.print().setParallelism(1);
ArrayList fields = new
ArrayList();
 fields.add(new org.apache.avro.Schema.Field("id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"id", JsonProperties.NULL_VALUE));
 fields.add(new org.apache.avro.Schema.Field("time",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"time", JsonProperties.NULL_VALUE));
 org.apache.avro.Schema parquetSinkSchema =
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields);
 String fileSinkPath = "./xxx.text/rs6/";
StreamingFileSink parquetSink = StreamingFileSink.
 forBulkFormat(new Path(fileSinkPath),
 ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .build();
 testDataStream.addSink(parquetSink).setParallelism(1);
 flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");


下面是异常:

09:29:50,283 INFO  org.apache.flink.runtime.taskmanager.Task
  - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)
switched from RUNNING to FAILED.09:29:50,283 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING
to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:697)

at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)

at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)

at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)

at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)09:29:50,284

INFO  org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Sink: Unnamed (1/1)
(79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO
org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Sink: Unnamed
(1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor-
Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Unnamed (1/1)
79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)

switched from RUNNING to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord at

flink1.10 使用 ParquetAvroWriters schema 模式写数据问题

2020-06-28 Thread yingbo yang
Hi:
在使用 ParquetAvroWriters.forGenericRecord(Schema schema)
写parquet文件的时候 出现 类转化异常:
下面是我的代码:

// //transfor 2 dataStream // TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(GenericData.Record.class,
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
TupleTypeInfo tupleTypeInfo = new
TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
 DataStream testDataStream = flinkTableEnv.toAppendStream(test, tupleTypeInfo);
 testDataStream.print().setParallelism(1);
ArrayList fields = new
ArrayList();
 fields.add(new org.apache.avro.Schema.Field("id",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"id", JsonProperties.NULL_VALUE));
 fields.add(new org.apache.avro.Schema.Field("time",
org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING),
"time", JsonProperties.NULL_VALUE));
 org.apache.avro.Schema parquetSinkSchema =
org.apache.avro.Schema.createRecord("pi", "flinkParquetSink",
"flink.parquet", true, fields);
 String fileSinkPath = "./xxx.text/rs6/";
StreamingFileSink parquetSink = StreamingFileSink.
 forBulkFormat(new Path(fileSinkPath),
 ParquetAvroWriters.forGenericRecord(parquetSinkSchema))
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 .build();
 testDataStream.addSink(parquetSink).setParallelism(1);
 flinkTableEnv.execute("ReadFromKafkaConnectorWriteToLocalFileJava");


下面是异常:

09:29:50,283 INFO  org.apache.flink.runtime.taskmanager.Task
  - Sink: Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)
switched from RUNNING to FAILED.09:29:50,283 INFO
org.apache.flink.runtime.taskmanager.Task - Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a) switched from RUNNING
to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord

at org.apache.avro.generic.GenericData.getField(GenericData.java:697)

at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)

at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)

at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)

at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)

at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:274)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:445)

at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)09:29:50,284

INFO  org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Sink: Unnamed (1/1)
(79505cb6ab2df38886663fd99461315a).09:29:50,285 INFO
org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task Sink: Unnamed
(1/1) (79505cb6ab2df38886663fd99461315a) [FAILED]09:29:50,289

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor-
Un-registering task and sending final execution state FAILED to
JobManager for task Sink: Unnamed (1/1)
79505cb6ab2df38886663fd99461315a.09:29:50,293 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink:
Unnamed (1/1) (79505cb6ab2df38886663fd99461315a)

switched from RUNNING to FAILED.java.lang.ClassCastException:
org.apache.flink.api.java.tuple.Tuple2 cannot be cast to
org.apache.avro.generic.IndexedRecord at
org.apache.avro.generic.GenericData.getField(GenericData.java:697) at
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:188)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)


请问是否是使用方式不对?还是什么问题?


回复:高可用集群

2020-06-28 Thread 刘佳炜
如果你公司用hadoop的话就是YARN StandAlone一般都是单机测试练习的



发自我的iPhone


-- 原始邮件 --
发件人: 李军 

高可用集群

2020-06-28 Thread 李军
   请教下,各位大佬们生产环境使用的是哪种集群配置
1. Standalone 集群
2. Yarn 集群

理由是什么,不知道怎么选择


2020-6-28
| |
李军
|
|
hold_li...@163.com
|
签名由网易邮箱大师定制

Re: Flink JOB_MANAGER_LEADER_PATH Znode 疑似泄漏问题

2020-06-28 Thread Paul Lam
Hi,

其实 HA 元数据没有自动清理是老问题了,可能要等到 ZK HA 的逻辑重构之后才可以解决,具体可以参考以下两个 ticket [1][2]。

不过即使 Flink 实现了自动清理,也没有办法处理外部原因导致作业退出而留下的元数据,所以还是要用户自己实现检测和清理的机制。

1. https://issues.apache.org/jira/browse/FLINK-6522 

2. https://issues.apache.org/jira/browse/FLINK-10333 


Best,
Paul Lam

> 2020年6月28日 12:29,于汝国  写道:
> 
> 
> 
> 
> flink本身不提供cancel 
> job后清理zookeeper上残留znode的功能或机制,包括hdfs上的部分数据,如果想清除的话,可手动操作或者自实现。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-06-28 09:12:41,"林恬"  写道:
>> 各位好:
>>   目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id} 
>> 节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>> 
>> 
>> 



Convert sql table with field of type MULITSET to datastream with field of type java.util.Map[T, java.lang.Integer]

2020-06-28 Thread YI
Hi, all

I am trying to do something like this
```
tEnv
.sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent GROUP BY 
rawEvent.id")
.toRetractStream[(Long, java.util.Map[String, java.lang.Integer])]
```

An exception is thrown when I ran the above code with the default planner 
setting in 1.10.1. I presume I am using the older planner.

```
Exception in thread "main" org.apache.flink.table.api.TableException: Result 
field does not match requested type. Requested: GenericType; 
Actual: Multiset
at 
org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104)
at 
org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at 
org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
at 
org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184)
at 
org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90)
at 
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127)
at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
at io.redacted.sub.package$.getMatchesWithParent(package.scala:244)
at io.redacted.sub.package$.process(package.scala:156)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

The result type of aggregation function collect is multiset. How do I convert 
it to a `java.util.Map[String, java.lang.Integer]`?

Cheers,
YI

Dynamic source and sink.

2020-06-28 Thread C DINESH
Hi All,

In a flink job I have a pipeline. It is consuming data from one kafka topic
and storing data to Elastic search cluster.

without restarting the job can we add another kafka cluster and another
elastic search sink to the job. Which means i will supply the new kafka
cluster and elastic search details in the topic.  After consuming the data
can our flink job add the new source and sink to the same job.


Thanks & Regards,
Dinesh.


Re: Re: 【Flink在sink端的Exactly once语义】

2020-06-28 Thread Jingsong Li
Hi,

补充Benchao的观点:
- 除了kafka以外,还有StreamingFileSink也是exactly-once不多不少的。
- 对于Mysql、ES,这种支持主键更新的,在upsert语义下(比如一个count(*) from t group
by),数据是最终一致的。所以我理解数据也是不多不少的exactly once语义。

Best,
Jingsong Lee

On Mon, Jun 22, 2020 at 11:46 AM 程龙 <13162790...@163.com> wrote:

> 需要自己实现比如幂等操作 比如通过表示为操作
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-06-22 10:04:43,"Benchao Li"  写道:
> >看起来现在只有Kafka实现了TwoPhaseCommitSinkFunction,所以目前应该也只有Kafka支持exactly once。
> >
> >不过像Mysql、ES这种,可以根据主键来更新的,只要能做到at least once应该就可以了。
> >
> >忝忝向仧 <153488...@qq.com> 于2020年6月21日周日 下午11:27写道:
> >
> >> Hi,all:
> >>
> >>
> >> Flink连接器这块,如果是sink到mysql,ES等,有对应的实现exactly once语义么?
> >> 比如kafka的连接有sink的exactly once语义,sink时候指定即可.
> >> 那么,如果是mysql后者其他的有么?
> >> 谢谢.
> >> return new FlinkKafkaProducer011<(
> >> "topic",
> >> new KeyedSerializationSchemaWrapper<(new
> SimpleStringSchema()),
> >> producerProperties,
> >> FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
> >
> >
> >
> >--
> >
> >Best,
> >Benchao Li
>


-- 
Best, Jingsong Lee


Re: flinksql

2020-06-28 Thread Jingsong Li
Hi,

在1.11之前,注意:flink sql-client只能创建flink的表而不是hive的表。
如果你用create table t (i int, j
int);的这个一个简短的语句,是不能创建出flink表来的。完整的Flink表需要with参数。[1]

在1.11中支持的hive dialect,才支持用create table t (i int, j int);这种简单的DDL创建Hive表。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

Best,
Jingsong Lee

On Wed, Jun 24, 2020 at 8:44 PM Rui Li  wrote:

> 是要通过Flink SQL
>
> client创建一张hive表么?在1.10里可能需要给表的参数加上'is_generic'='false'才行,否则创建出来的表默认是非hive表,hive那边是读不了的
>
> On Mon, Jun 22, 2020 at 3:53 PM Leonard Xu  wrote:
>
> > Hi,
> > 这个报错通常是缺少了 connector 相关的jar包,或者 connector 的with参数填写错误。
> > > flink sql-client下建的表
> > 这是什么表,可以把建表 SQL 发出来看看吗?
> >
> > Best,
> > Leonard Xu
>
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee


Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-28 Thread Tianwang Li
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
我增加每个task处理窗口数据的时间在观察一下,
我这个是测试任务,没有sink输出。
source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右)

LakeShen  于2020年6月28日周日 上午10:35写道:

> Hi Tianwang Li,
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
> Best,
> LakeShen
>
> zhisheng  于2020年6月28日周日 上午10:27写道:
>
> > hi, Tianwang Li
> >
> > 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
> >
> > > 任务经常会出现反压(特别是在窗口输出的时候)
> >
> > 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
> >
> >
> > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
> >
> > 这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
> >
> > 另外建议补一下 UI 上 chekcpoint 相关的截图和日志信息,这样才能更好的定位问题。
> >
> >
> > Best !
> > zhisheng
> >
> >
> > Tianwang Li  于2020年6月28日周日 上午10:17写道:
> >
> > > 关于Flink checkpoint偶尔会比较长时间的问题。
> > >
> > > *环境与背景:*
> > > 版本:flink1.10.0
> > > 数据量:每秒约10万左右的记录,数据源是kafka
> > > 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
> > > 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
> > >
> > > *问题:*
> > > 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
> > > source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting
> > checkpoint消耗时间比较长。
> > >
> > > checkpoint情况大致如下:
> > >
> > > [image: image.png]
> > > [image: image.png]
> > > [image: image.png]
> > >
> > > 2020-06-24 21:09:53,369 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor- Trigger
> > > checkpoint 316@1593004193363 for 84dce1ec8aa5a4df2d1758d6e9278693.
> > >
> > > 2020-06-24 21:09:58,327 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:09:59,266 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Memory usage stats: [HEAP: 202/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:09:59,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:10:08,346 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from e88ea2f790430c9c160e540ef0546d60.
> > >
> > > 2020-06-24 21:10:09,286 DEBUG
> > > org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received
> > > heartbeat request from b93d7167db364dfdcbda886944f1482f.
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Memory usage stats: [HEAP: 557/2573/2573 MB, NON HEAP:
> > > 111/114/424 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:10:09,686 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 108643, GC
> > > COUNT: 6981], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > >
> > > 省略
> > >
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Direct memory stats: Count: 17403, Total Capacity: 583911423,
> > > Used Memory: 583911424
> > >
> > > 2020-06-24 21:55:39,875 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Off-heap pool stats: [Code Cache: 35/35/240 MB
> > > (used/committed/max)], [Metaspace: 67/69/96 MB (used/committed/max)],
> > > [Compressed Class Space: 8/9/88 MB (used/committed/max)]
> > >
> > > 2020-06-24 21:55:39,876 INFO
> > org.apache.flink.runtime.taskexecutor.TaskManagerRunner
> > >   - Garbage collector stats: [PS Scavenge, GC TIME (ms): 110520, GC
> > > COUNT: 7083], [PS MarkSweep, GC TIME (ms): 1074, GC COUNT: 6]
> > >
> > > 2020-06-24 21:55:41,721 DEBUG
> > >