回复: flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 Thread Joker
不好意思,插入个问题。ts AS TO_TIMESTAMP(FROM_UNIXTIME(create_time / 1000, '-MM-dd 
HH:mm:ss')) ,我按此方式生成事件时间列,发现watermark一直比北京时间多8小时,比如create_time 
为1600926591666,ts计算出来是2020/9/24 13:49:51没问题,但在WebUI上发现提取的watermark为2020/9/24 
21:49:51


| |
Joker
|
|
gaojintao...@163.com
|
签名由网易邮箱大师定制


在2020年09月24日 13:40,Jark Wu 写道:
Flink 的 TO_TIMESTAMP 函数用的是 Java SimpleDateFormat 来解析时间格式的,所以可以看下
SimpleDateFormat 的 javadoc。
你可以试下 to_timestamp('2020-09-23T20:58:24+08:00',
'-MM-dd''T''HH:mm:ssXXX') 来解析你的数据。

Best,
Jark

On Wed, 23 Sep 2020 at 21:08, chenxuying  wrote:

flinksql 版本是1.11.2
source接收到字段是字符串类型的时间
CREATE TABLE sourceTable (
`time` STRING
) WITH(
...
);


sink如下
CREATE TABLE sinktable (
`time1` STRING,
`time` TIMESTAMP(3)
) WITH (
'connector' = 'print'
);


insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
insert into sinktable select
`time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable


报错说是format错误
Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
at
java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
at
java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)


Re: flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 Thread Jark Wu
Flink 的 TO_TIMESTAMP 函数用的是 Java SimpleDateFormat 来解析时间格式的,所以可以看下
SimpleDateFormat 的 javadoc。
你可以试下 to_timestamp('2020-09-23T20:58:24+08:00',
'-MM-dd''T''HH:mm:ssXXX') 来解析你的数据。

Best,
Jark

On Wed, 23 Sep 2020 at 21:08, chenxuying  wrote:

> flinksql 版本是1.11.2
> source接收到字段是字符串类型的时间
> CREATE TABLE sourceTable (
>  `time` STRING
>  ) WITH(
> ...
>  );
>
>
> sink如下
> CREATE TABLE sinktable (
> `time1` STRING,
> `time` TIMESTAMP(3)
>  ) WITH (
>  'connector' = 'print'
>  );
>
>
> insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
>  insert into sinktable select
> `time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable
>
>
> 报错说是format错误
> Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
> at
> java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
> at
> java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
> at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)


Re: How to disconnect taskmanager via rest api?

2020-09-23 Thread Yang Wang
I think this is an interesting feature, especially when deploying Flink
standalone clusters on K8s.
The TaskManager pods are started/stopped externally via kubectl or other
tools. When we need to stop
a TaskManager pod, even though the pod is deleted quickly, we have to wait
for a timeout so that it
disappears in the JobManager.

Only the ActiveResourceManager(e.g. YarnResourceManager,
KubernetesResourceManager) have
the ability to allocate/release TaskManagers. Maybe in the standalone mode,
we need to make the
TaskManager could deregister themselves during stop.


Best,
Yang


Luan Cooper  于2020年9月23日周三 下午4:30写道:

> thanks
> I'll create a new issue for this feature on github
>
>
> On Mon, Sep 21, 2020 at 11:51 PM Timo Walther  wrote:
>
>> Hi Luan,
>>
>> this sound more of a new feature request to me. Maybe you can already
>> open an issue for it.
>>
>> I will loop in Chesnay in CC if there is some possibility to achieve
>> this already?
>>
>> Regards,
>> Timo
>>
>> On 21.09.20 06:37, Luan Cooper wrote:
>> > Hi
>> >
>> > We're running flink standalone cluster on k8s
>> > when deleting a taskmanager pod manually, jobmanager *should disconnect
>> > it immediately*
>> >
>> > however no such rest api available right now
>> > we have to wait `akka.tcp.timeout` which means 30s later or more
>> >
>> > What if I want to disconnect tm via rest api
>> > Which way did you suggest ?
>> >
>> > 1. add disconnectTaskManager to
>> > org.apache.flink.runtime.dispatcher.Dispatcher
>> > which means a new Interface
>> >
>> > CompletableFuturedisconnectTaskManager(JobID jobId,
>> ResourceID resourceId);
>> >
>> > in org.apache.flink.runtime.webmonitor.RestfulGateway
>> >
>> > 2. Any other suggestions?
>> >
>> > Thanks
>>
>>


Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread nashcen
+1



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-23 Thread Xintong Song
How many slots do you have on each task manager?

Flink uses ChildFirstClassLoader for loading user codes, to avoid
dependency conflicts between user codes and Flink's framework. Ideally,
after a slot is freed and reassigned to a new job, the user class loaders
of the previous job should be unloaded. 33 instances of them does not
sound right. It might be worth looking into where the references that keep
these instances alive come from.

Flink 1.10.3 is not released yet. If you want to try the unreleased
version, you would need to download the sources [1], build the flink
distribution [2] and build your custom image (from the 1.0.2 image and
replace the flink distribution with the one you built).

Thank you~

Xintong Song


[1] https://github.com/apache/flink/tree/release-1.10

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/flinkDev/building.html



On Wed, Sep 23, 2020 at 8:29 PM Claude M  wrote:

> It was mentioned that this issue may be fixed in 1.10.3 but there is no
> 1.10.3 docker image here: https://hub.docker.com/_/flink
>
>
> On Wed, Sep 23, 2020 at 7:14 AM Claude M  wrote:
>
>> In regards to the metaspace memory issue, I was able to get a heap dump
>> and the following is the output:
>>
>> Problem Suspect 1
>> One instance of *"java.lang.ref.Finalizer"* loaded by *"> loader>"* occupies *4,112,624 (11.67%)* bytes. The instance is
>> referenced by *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *"> class loader>"*. The memory is accumulated in one instance of
>> *"java.lang.Object[]"* loaded by *""*.
>>
>> Problem Suspect 2
>> 33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*, loaded
>> by *"sun.misc.Launcher$AppClassLoader @ 0xb4068680"* occupy *6,615,416
>> (18.76%)*bytes.
>>
>> Based on this, I'm not clear on what needs to be done to solve this.
>>
>>
>> On Tue, Sep 22, 2020 at 3:10 PM Claude M  wrote:
>>
>>> Thanks for your responses.
>>> 1.  There were no job re-starts prior to the metaspace OEM.
>>> 2.  I tried increasing the CPU request and still encountered the
>>> problem.  Any configuration change I make to the job manager, whether it's
>>> in the flink-conf.yaml or increasing the pod's CPU/memory request, results
>>> with this problem.
>>>
>>>
>>> On Tue, Sep 22, 2020 at 12:04 AM Xintong Song 
>>> wrote:
>>>
 Thanks for the input, Brain.

 This looks like what we are looking for. The issue is fixed in 1.10.3,
 which also matches this problem occurred in 1.10.2.

 Maybe Claude can further confirm it.

 Thank you~

 Xintong Song



 On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:

> Hi Xintong and Claude,
>
>
>
> In our internal tests, we also encounter these two issues and we spent
> much time debugging them. There are two points I need to confirm if we
> share the same problem.
>
>1. Your job is using default restart strategy, which is per-second
>restart.
>2. Your CPU resource on jobmanager might be small
>
>
>
> Here is some findings I want to share.
>
> ## Metaspace OOM
>
> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we
> have some job restarts, there will be some threads from the sourceFunction
> hanging, cause the class loader cannot close. New restarts would load new
> classes, then expand the metaspace, and finally OOM happens.
>
>
>
> ## Leader retrieving
>
> Constant restarts may be heavy for jobmanager, if JM CPU resources are
> not enough, the thread for leader retrieving may be stuck.
>
>
>
> Best Regards,
>
> Brian
>
>
>
> *From:* Xintong Song 
> *Sent:* Tuesday, September 22, 2020 10:16
> *To:* Claude M; user
> *Subject:* Re: metaspace out-of-memory & error while retrieving the
> leader gateway
>
>
>
> ## Metaspace OOM
>
> As the error message already suggested, the metaspace OOM you
> encountered is likely caused by a class loading leak. I think you are on
> the right direction trying to look into the heap dump and find out where
> the leak comes from. IIUC, after removing the ZK folder, you are now able
> to run Flink with the heap dump options.
>
>
>
> The problem does not occur in previous versions because Flink starts
> to set the metaspace limit since the 1.10 release. The class loading leak
> might have already been there, but is never discovered. This could lead to
> unpredictable stability and performance issues. That's why Flink updated
> its memory model and explicitly set the metaspace limit in the 1.10 
> release.
>
>
>
> ## Leader retrieving
>
> The command looks good to me. If this problem happens only once, it
> could be irrelevant to adding the options. If that does not block you from
> getting the heap dump, we can look into it later.
>
>
> Thank 

Re: Error on deploying Flink docker image with Kubernetes (minikube) and automatically launch a stream WordCount job.

2020-09-23 Thread Yang Wang
Hi Felipe,

Currently, if you want to deploy a standalone job/application Flink cluster
on K8s via yamls.
You should have your own image with user jar baked located at
/opt/flink/usrlib. It could not be
specified via config option. Usually, you could add new layer on the
official docker image to
build in the user jar.

A more graceful solution is init container[1], which could download the
user jar from remote
storage or just copy from local directory /opt/flink/opt to
/opt/flink/usrlib.

[1].
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-initialization/

Best,
Yang


Felipe Gutierrez  于2020年9月18日周五 下午5:52写道:

> Hi community,
>
> I am trying to deploy the default WordCount stream application on
> minikube using the official documentation at [1]. I am using minikube
> v1.13.0 on Ubuntu 18.04 and Kubernetes v1.19.0 on Docker 19.03.8. I
> could sucessfully start 1 job manager and 3 task managers using the
> yaml files flink-configuration-configmap.yaml,
> jobmanager-service.yaml, jobmanager-rest-service.yaml,
> jobmanager-session-deployment.yaml, and
> taskmanager-session-deployment.yaml (all available on the Apendix of
> this link [1]).
>
> Then I would like to start the word-count stream job available on the
> flink jar image [2], which I believe is available since it is built
> inside the default flink jar distribution. What I understood that I
> have to do is to create the objects based on the files
> jobmanager-job.yaml and taskmanager-job-deployment.yaml (also
> available on the link [1]). And, I think that I have to replace this
> line below on the object jobmanager-job.yaml
> (spec.template.spec.containers.name[jobmanager]):
>
> args: ["standalone-job", "--job-classname",
> "org.apache.flink.streaming.examples.wordcount.WordCount"]
>
> Is this correct? I am not sure if this is my entire error. I am
> getting the message "Could not find the provided job class
> (org.apache.flink.streaming.examples.wordcount.WordCount) in the user
> lib directory (/opt/flink/usrlib)". As far as I know
> "/opt/flink/usrlib" is the default directory. I am not sure if I have
> to change the property: path /host/path/to/job/artifacts. This is my
> log message of the pod error.
> Do you guys have any idea of what I am missing in my configuration?
>
> Thanks, Felipe
>
> $ kubectl get pods
> NAME READY   STATUS RESTARTS   AGE
> flink-jobmanager-ftgg9   0/1 CrashLoopBackOff   3  83s
> $ kubectl logs flink-jobmanager-ftgg9
> Starting Job Manager
> sed: couldn't open temporary file /opt/flink/conf/sedA699Jt: Read-only
> file system
> sed: couldn't open temporary file /opt/flink/conf/sedvZhs0w: Read-only
> file system
> /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> /opt/flink/conf/flink-conf.yaml: Permission denied
> /docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create
> /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> Starting standalonejob as a console application on host
> flink-jobmanager-ftgg9.
> 2020-09-18 09:15:58,801 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> 
> 2020-09-18 09:15:58,804 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Preconfiguration:
> 2020-09-18 09:15:58,804 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
>
> JM_RESOURCE_PARAMS extraction logs:
> jvm_params: -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> logs: INFO  [] - Loading configuration property:
> jobmanager.rpc.address, flink-jobmanager
> INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
> INFO  [] - Loading configuration property: blob.server.port, 6124
> INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
> INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122
> INFO  [] - Loading configuration property: queryable-state.proxy.ports,
> 6125
> INFO  [] - Loading configuration property: jobmanager.memory.process.size,
> 1600m
> INFO  [] - Loading configuration property:
> taskmanager.memory.process.size, 1728m
> INFO  [] - Loading configuration property: parallelism.default, 2
> INFO  [] - The derived from fraction jvm overhead memory (160.000mb
> (167772162 bytes)) is less than its min value 192.000mb (201326592
> bytes), min value will be used instead
> INFO  [] - Final Master Memory configuration:
> INFO  [] -   Total Process Memory: 1.563gb (1677721600 bytes)
> INFO  [] - Total Flink Memory: 1.125gb (1207959552 bytes)
> INFO  [] -   JVM Heap: 1024.000mb (1073741824 bytes)
> INFO  [] -   Off-heap: 128.000mb (134217728 bytes)
> INFO  [] - JVM Metaspace:  256.000mb (268435456 bytes)
> INFO  [] - JVM Overhead:   192.000mb (201326592 bytes)
>
> 2020-09-18 09:15:58,805 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> 

Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jark Wu
+1 to move it there.

On Thu, 24 Sep 2020 at 12:16, Jingsong Li  wrote:

> Hi devs and users:
>
> After the 1.11 release, I heard some voices recently: How can't Hive's
> documents be found in the "Table & SQL Connectors".
>
> Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
> SQL Connectors" document was extracted separately, Hive is a little out of
> place.
> And Hive's code is also in "flink-connector-hive", which should be a
> connector.
> Hive also includes the concept of HiveCatalog. Is catalog a part of the
> connector? I think so.
>
> What do you think? If you don't object, I think we can move it.
>
> Best,
> Jingsong Lee
>


Re: Adaptive load balancing

2020-09-23 Thread Zhijiang
Hi Krishnan,

Thanks for discussing this interesting scenario! 

It makes me remind of a previous pending improvement of adaptive load balance 
for rebalance partitioner. 
Since the rebalance mode can emit the data to any nodes without precision 
consideration, then the data can be emitted based on the current backlog of 
partition adaptively which can reflect the load condition of consumers somehow.

For your keyBy case, I guess the requirement is not only for the load balance 
of processing, but also for the consistency of preloaded cache.
Do you think it is possible to implement somehow custom partitioner which can 
control the logic of keyBy distribution based on pre-defined cache distribution 
in nodes? 

Best,
Zhijiang
--
From:Navneeth Krishnan 
Send Time:2020年9月23日(星期三) 02:21
To:user 
Subject:Adaptive load balancing

Hi All,

We are currently using flink in production and use keyBy for performing a CPU 
intensive computation. There is a cache lookup for a set of keys and since 
keyBy cannot guarantee the data is sent to a single node we are basically 
replicating the cache on all nodes. This is causing more memory problems for us 
and we would like to explore some options to mitigate the current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we 
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that if 
a node is busy processing then the data can be routed effectively to other 
nodes which are free.

Any suggestions are greatly appreciated.

Thanks



Re: [DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jark Wu
+1 to move it there.

On Thu, 24 Sep 2020 at 12:16, Jingsong Li  wrote:

> Hi devs and users:
>
> After the 1.11 release, I heard some voices recently: How can't Hive's
> documents be found in the "Table & SQL Connectors".
>
> Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
> SQL Connectors" document was extracted separately, Hive is a little out of
> place.
> And Hive's code is also in "flink-connector-hive", which should be a
> connector.
> Hive also includes the concept of HiveCatalog. Is catalog a part of the
> connector? I think so.
>
> What do you think? If you don't object, I think we can move it.
>
> Best,
> Jingsong Lee
>


[DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jingsong Li
Hi devs and users:

After the 1.11 release, I heard some voices recently: How can't Hive's
documents be found in the "Table & SQL Connectors".

Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
SQL Connectors" document was extracted separately, Hive is a little out of
place.
And Hive's code is also in "flink-connector-hive", which should be a
connector.
Hive also includes the concept of HiveCatalog. Is catalog a part of the
connector? I think so.

What do you think? If you don't object, I think we can move it.

Best,
Jingsong Lee


[DISCUSS] Move Hive document to "Table & SQL Connectors" from "Table API & SQL"

2020-09-23 Thread Jingsong Li
Hi devs and users:

After the 1.11 release, I heard some voices recently: How can't Hive's
documents be found in the "Table & SQL Connectors".

Actually, Hive's documents are in the "Table API & SQL". Since the "Table &
SQL Connectors" document was extracted separately, Hive is a little out of
place.
And Hive's code is also in "flink-connector-hive", which should be a
connector.
Hive also includes the concept of HiveCatalog. Is catalog a part of the
connector? I think so.

What do you think? If you don't object, I think we can move it.

Best,
Jingsong Lee


Re: Flink-1.11 sql-client yaml 配置问题

2020-09-23 Thread Rui Li
你好,这个感觉是缺少hive connector的依赖,lib下面添加了哪些jar呢?

On Thu, Sep 24, 2020 at 11:00 AM nashcen <2415370...@qq.com> wrote:

> 准备通过 命令行工具 $FLINK_HOME/bin/sql-client.sh embedded
> 登录 Flink SQL 客户端 去连接 Hive,
>
>
> 我在 Flink-SQL 的配置文件 sql-client-defaults.yaml 里,
> 加入了以下参数
> catalogs:
>   - name: myhive
> type: hive
> hive-conf-dir:
> /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf
> default-database: dc_stg
>
> 启动报错,以下是报错信息:
>
> Reading default environment from:
>
> file:/bigdata/athub/app/bigdata/flink/flink-1.11.1/conf/sql-client-defaults.yaml
> No session environment specified.
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException:
> Unexpected exception. This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could
> not find a suitable table factory for
> 'org.apache.flink.table.factories.CatalogFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The following properties are requested:
> default-database=dc_stg
>
> hive-conf-dir=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf
> type=hive
>
> The following factories have been considered:
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:377)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626)
> at java.util.HashMap.forEach(HashMap.java:1289)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
> ... 3 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: kafka增加字段,hive表如何处理

2020-09-23 Thread Rui Li
Hi,

直接给hive表增加字段遇到的具体问题是什么呢?把stacktrace贴一下吧。

On Wed, Sep 23, 2020 at 6:50 PM china_tao  wrote:

> flink1.11.1,flink sql,已经实现flink sql
> 读取kafka,存储到hive。现在的问题是,kafka源增加字段了,flink
> sql中的hive如何修改。直接在hive中增加字段的话,每次启动,会报 hive表已经存在,如果drop table if
> exists的话,历史数据就会丢。请问大家是如何处理的,谢谢。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


flink on yarn NM JVM内存

2020-09-23 Thread superainbower
Hi, 大家好
我有个flink任务在yarn上跑,statebackend是rocksdb,由于是测试,所以一段时间内我反复起停了任务,后来我发现在Yarn集群的NodeManger出现GC时间超出阈值报警(没有其他错误日志),此时我查看对应节点的
 
NodeManger的JVM堆内存几乎占满了(1.5G),从曲线图上看整个堆内存是逐步增加的(和我测试Flink任务的时间基本吻合),GC持续达到30多秒,把flink任务停止后,JVM堆内存始终下不来,只能重启Yarn集群;
想请教大家,flink on yarn给了 taskmanger的内存 
和jobmanager的内存,怎么还会影响到Nodemanger的JVM内存,另外任务已经停掉,JVM堆内存也下不来,这块会和rocksdb有关系吗?


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制



?????? flink sql????????

2020-09-23 Thread ang
benchao??config??sql??







----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

ang <806040...@qq.com ??2020??9??23?? 4:24??

 hi
 ??flink 
sqlkafka??event
 
time5s??5s??waterwark??
 WATERMARK FOR ts AS tsnbsp; - INTERVAL '5' SECODND
 
??5sdatastream
 apiallowed
 lateness??sql??


 Flink1.10.1
 nbsp;



-- 

Best,
Benchao Li

Flink-1.11 sql-client yaml 配置问题

2020-09-23 Thread nashcen
准备通过 命令行工具 $FLINK_HOME/bin/sql-client.sh embedded
登录 Flink SQL 客户端 去连接 Hive,


我在 Flink-SQL 的配置文件 sql-client-defaults.yaml 里,
加入了以下参数
catalogs:
  - name: myhive
type: hive
hive-conf-dir:
/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf
default-database: dc_stg

启动报错,以下是报错信息:

Reading default environment from:
file:/bigdata/athub/app/bigdata/flink/flink-1.11.1/conf/sql-client-defaults.yaml
No session environment specified.


Exception in thread "main" org.apache.flink.table.client.SqlClientException:
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could
not find a suitable table factory for
'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
default-database=dc_stg
hive-conf-dir=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:377)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:626)
at java.util.HashMap.forEach(HashMap.java:1289)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
... 3 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


答复: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-23 Thread 范超
感谢磊哥,后来发现确实是这个问题导致。

Source节点的并行度取决于topic的分区数


-邮件原件-
发件人: 吴磊 [mailto:wuleifl...@foxmail.com] 
发送时间: 2020年9月18日 星期五 16:29
收件人: user-zh 
主题: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

hello,Source节点并行度的有效性是取决于topic对应的分区数的。比如如果你只有6个分区,那你12个并行度和6个并行度的消费速度是一样的。




--原始邮件--
发件人:
"user-zh"   
 

答复: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-23 Thread 范超
谢谢Benchao哥回复。

这几天一直忙着压测这个问题。
经多轮压测(先灌满kafka数据),再去消费。
发现确实是您说的问题中的第三个情况
由于kafka的topic只开了一个partition
所以flinkkafkaconsumer按照一个taskmanger对应了一个kafka的parition的方式进行了处理。从而导致虽然作业并发度够大,但是由于只有一个partition,
其他并发的taskmanager无法获取到更多的partition进行消费,从而导致并行度提升而作业消费能力却无法同比增大。

之后通过建立2个partition的topic,实现了消费能力的翻倍。


想再请多问您一句,我如果想压出作业的极限吞吐量,请问该如何设置一些运行参数,目前我通过设置on yarn 
的tm的内存大小,kafka的partition数目,也无法将作业的吞吐量压上去。



-邮件原件-
发件人: Benchao Li [mailto:libenc...@apache.org] 
发送时间: 2020年9月18日 星期五 18:49
收件人: user-zh 
主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

提交两个作业的话,两个作业是完全独立的,都会消费全量数据。

一个作业的消费能力不行,可以具体看下瓶颈在哪里,比如:
1. 作业是否有lag,如果没有lag,那其实是没有问题的
2. 如果作业有lag,而且lag还在上涨,说明当前消费能力不足,此时可以看下作业具体的瓶颈在哪里
有可能是某个算子在反压导致整个作业的消费能力不足
也有可能是作业的整体CPU资源不足导致的
也有一种极端情况是,作业的并发度已经足够大,source subtask已经对应一个kafka
partition了,但是消费能力还是不足,这个时候其实是单个partition数据量太大,对应到Flink的source算子处理能力不足导致的
3. 如果作业当前有lag,但是lag在下降,说明消费能力其实是够的,只是数据有些积压

范超  于2020年9月18日周五 下午4:07写道:

> 各位好,我遇到了一个奇怪的问题
>
> 我是使用flink1.10和 flink-connector-kafka_2.11
>
> 使用Flink on yarn 模式运行,无论怎么调大并行度。Kafka节点(我使用的单节点)的网卡输出速度一直上不去。
>
> 但是提交两个同样的应用同样使用FLink on Yarm模式,Kafka节点的网卡输出速度是正常翻倍的。
>
> 我想达到的目的不是通过多向yarn集群提交多一个app,而是通过设置并行度来提高应用的吞吐量。。
>
> 求各位大佬指导
>


-- 

Best,
Benchao Li


Re: Support for gRPC in Flink StateFun 2.x

2020-09-23 Thread Dalmo Cirne
Thank you so much for creating the ticket, Igal. We are looking forward to 
being able to use it!

And thank you for giving a little more context about how StateFun keeps a 
connection pool and tries to optimize for performance and throughput.

With that said, gRPC is an architectural choice we have made. It would be 
better to maintain project consistency, rather than opening exceptions here and 
there.

We will definitely take StateFun for a spin once we can use it with gRPC.

Cheers,

Dalmo



From: Igal Shilman 
Date: Wednesday, September 23, 2020 at 07:53
To: Dalmo Cirne 
Cc: "user@flink.apache.org" 
Subject: Re: Support for gRPC in Flink StateFun 2.x

Hi Dalmo,

Thanks a lot for sharing this use case!

If I understand the requirement correctly, you are mostly concerned with 
performance. In that case I've created
an issue [1] to add a gRPC transport for StateFun, and I believe we would be 
able to implement it in the upcoming weeks.

Just a side note about the way StateFun invokes remote functions via HTTP, at 
the moment:

- StateFun keeps a connection pool, to avoid re-establishing the connection for 
each request.
- StateFun batches requests per address (key) to amortize the cost of a round 
trip, and state shipment.

There is an RC2 for the upcoming StateFun version, with some improvements 
around HTTP functions,
and operational visibility (logs and metrics). So perhaps you can take that for 
a spin if you are evaluating StateFun
at the moment. The release itself is expected to happen at the end of this week.


[1] 
https://issues.apache.org/jira/browse/FLINK-19380

Thanks,
Igal.


On Tue, Sep 22, 2020 at 4:38 AM Dalmo Cirne 
mailto:dalmo.ci...@workday.com>> wrote:
Thank you for the quick reply, Igal.

Our use case is the following: A stream of data from Kafka is fed into Flink 
where data transformations take place. After that we send that transformed data 
to an inference engine to score the relevance of each record. (Rough 
simplification.)

Doing that using HTTP endpoints is possible, and it is the solution we have in 
place today, however, for each request to that endpoint, we need to incur the 
cost of establishing the connection, etc., thus increasing the latency of the 
system.

We do process data in batches to mitigate the latency, but it is not the same 
as having a bi-directional stream, as it would be possible using gRPC. 
Furthermore, we already use gRPC in other parts of our system.

We also want to be able to scale those endpoints up and down, as demand for the 
service fluctuates depending on the hour and day. Combining StateFun and 
Kubernetes would allow for that elasticity of the service, while keeping state 
of the execution, since inferences are not always just one endpoint, but a 
collection of them where the output of one becomes the input of the next, 
culminating with the predicted score(s).

We are evaluating StateFun because Flink is already part of the infrastructure. 
With that said, gRPC is also part of our requirements, thus motivation for the 
question.

I’d love to hear more about plans to implement support for gRPC and perhaps 
become an early adopter.

I hope this helps with understanding of the use case. Happy to talk further and 
answer more questions.

Best,

Dalmo



From: Igal Shilman mailto:i...@ververica.com>>
Date: Saturday, September 19, 2020 at 01:41
To: Dalmo Cirne mailto:dalmo.ci...@workday.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Support for gRPC in Flink StateFun 2.x

Hi,

Your observation is correct, currently the only way to invoke a remote function 
is trough an HTTP POST request to a service that exposes a StateFun endpoint.

The endpoint must implement the client side of a the “RequestReply” protocol as 
defined by StateFun (basically an invocation contains the state and message, 
and a response contains a description of the side effects).

While gRPC can be easily added a as a replacement for the transport layer, the 
client side (the remote function) would still have to implement the 
RequestReply protocol.

To truly utilize gRPC we would want to introduce a new type of protocol, that 
can exploit the low latency bi-directional streams to and from the function.

While for the later it is a bit difficult to commit for a specific date the 
former can be easily implemented in the next StateFun release.

Would you be able to share with us a little bit more about your original 
motivation to ask this question :-)
This would help us as we gather more and more use cases.

For example: target language, environment, how gRPC services are being 
discovered.

Thanks,
Igal

On Thursday, September 17, 2020, 

Reusing Flink SQL Client's Environment for Flink pipelines

2020-09-23 Thread Dan Hill
Has anyone tried to reused the Flink SQL Client's yaml Environment

configuration
for their production setups?  It seems pretty flexible.

I like most of the logic inside ExecutionContext.java
.
I'm curious to learn about issues from other Flink developers.


Re: RichFunctions in Flink's Table / SQL API

2020-09-23 Thread Piyush Narang
Hi Timo,

Thanks for getting back and filing the jira. I'll try to see if there's a way 
we can rework things to take advantage of the aggregate functions. 

-- Piyush
 

On 9/23/20, 3:55 AM, "Timo Walther"  wrote:

Hi Piyush,

unfortunately, UDFs have no direct access to Flink's state. Aggregate 
functions are the only type of functions that can be stateful at the 
moment. Aggregate functions store their state in an accumulator that is 
serialized/deserialized on access, but an accumulator field can be 
backed by a so-called DataView [1] which is directly backed by Flink's 
state. Maybe it is possible to leverage this functinality.

I created an issue to track this problem [2]. But of course this is not 
on the roadmap so far.

Regards,
Timo

[1] 

https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
[2] https://issues.apache.org/jira/browse/FLINK-19371

On 22.09.20 20:28, Piyush Narang wrote:
> Hi folks,
> 
> We were looking to cache some data using Flink’s MapState in one of our 
> UDFs that are called by Flink SQL queries. I was trying to see if 
> there’s a way to set up these state objects via the basic 
> FunctionContext [1] we’re provided in the Table / SQL 
> UserDefinedFunction class [2] but from what I can see it’s not possible. 
> We just seem to have access to retrieve the metric group and access to 
> the distributed cache / job params. Is there a way for us in Table / SQL 
> UDFs to access Flink’s state and store data? Or is this something that 
> isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using 
> the old SQL planner).
> 
> Our broader use-case is to enrich some data coming in via a Kafka stream 
> by reading additional data in DynamoDB. We’d like to cache this across 
> restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like 
> to move to temporal tables, but I think that requires a migration to 
> Blink first?)
> 
> Thanks,
> 
> [1] - 
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html
> 
> [2] - 
> 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html
> 
> -- Piyush
> 






Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-23 Thread Lian Jiang
Dawid,

Thanks for the fix. I may wait for Flink 1.12 coming out at the end of Oct
this year. Meanwhile, I may want to better understand the current solution
at the beginning of this thread.

My observations:

1. ProcessFunction with streamEnv.getConfig().enableObjectReuse() -->
working

2. ProcessFunction without streamEnv.getConfig().enableObjectReuse() -->
Not working

Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
java.time.Instant: 2020-09-21T18:54:06.216Z
at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1293)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1286)
at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1224)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:243)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



3. KeyedProcessFunction with streamEnv.getConfig().enableObjectReuse() -->
Not working

Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
java.time.Instant: 2020-09-21T19:52:58.477Z
at 
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:912)
at 
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:413)
at 
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:874)
at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:272)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:143)
at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at 
org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
at 
org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at 

Re: How to stop multiple Flink jobs of the same name from being created?

2020-09-23 Thread Yang Wang
Hi Dan,

If you are using a K8s job to deploy the "INSERT INTO" SQL jobs into the
existing Flink cluster, then
you have to manage the lifecycle of these jobs by yourself. I think you
could use flink command line or
rest API to check the job status first.

Best,
Yang

Dan Hill  于2020年9月23日周三 上午8:07写道:

> Hi Yang!
>
> The multiple "INSERT INTO" jobs all go to the same Flink cluster.  I'm
> using this Helm chart
>  (which
> looks like the standalone option).  I deploy the job using a simple k8
> Job.  Sounds like I should do this myself.  Thanks!
>
> Thanks!
> - Dan
>
>
>
> On Tue, Sep 22, 2020 at 5:37 AM Yang Wang  wrote:
>
>> Hi Dan,
>>
>> First, I want to get more information about your submission so that we
>> could make the question clear.
>>
>> Are you using TableEnvironment to execute multiple "INSERT
>> INTO" sentences and find that each one will
>> be executed in a separated Flink cluster? It is really strange, and I
>> want to know how your are deploying your
>> Flink cluster on Kubernetes, via standalone[1] or native integration[2].
>> If it is the former, I am afraid you need
>> `kubectl` to start/stop your Flink application manually. If it is the
>> latter, I think the Flink cluster will be destroyed
>> automatically when the Flink job failed. Also all the SQL jobs will be
>> executed in a shared Flink application.
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
>> [2].
>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>>
>>
>> Best,
>> Yang
>>
>> Dan Hill  于2020年9月21日周一 上午8:15写道:
>>
>>> I've read the following upgrade application page
>>> .
>>> This seems to focus on doing this in a wrapper layer (e.g. Kubernetes).
>>> Just checking to see if this is the common practice or do people do this
>>> from their client jars.
>>>
>>>
>>>
>>> On Sun, Sep 20, 2020 at 5:13 PM Dan Hill  wrote:
>>>
 I'm prototyping with Flink SQL.  I'm iterating on a client job with
 multiple INSERT INTOs.  Whenever I have an error, my Kubernetes job
 retries.  This creates multiple stream jobs with the same names.

 Is it up to clients to delete the existing jobs?  I see Flink CLI
 functions for this.  Do most people usually do this from inside their
 client jar or their wrapper code (e.g. Kubernetes job).

 - Dan

>>>


Poor performance with large keys using RocksDB and MapState

2020-09-23 Thread ירון שני
Hello,
I have a poor throughput issue, and I think I managed to reproduce it using
the following code:

val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(6 * 1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(8 * 1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))

val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  .setStateBackend(be)

env.setParallelism(3)
env.getConfig.enableObjectReuse()

val r = new scala.util.Random(31)
val randStr = r.nextString(4992)
val s = env.fromElements(1).process((value: Int, ctx:
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int,
_root_.scala.Predef.String]#Context, out:
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) =>
{
  for (a <- 1 to 1000 * 1000 * 10) {
out.collect( randStr + r.nextString(8) )

  }
}).keyBy(a=>a).process(new ProcessFunction[String, String] {
  private var someState: MapState[String, String] = _

  override def open(parameters: Configuration): Unit = {
someState = getRuntimeContext.getMapState(
  new MapStateDescriptor[String, String]("someState",
createTypeInformation[String], createTypeInformation[String])
)
  }

  override def processElement(value: _root_.scala.Predef.String,
ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
_root_.scala.Predef.String]#Context, out:
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]):
Unit = {
if(!someState.contains(value)) {
  someState.put(value, value)
}
  }
})

env.execute()

This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix
extractor" which I don't know how to fill yet, and no idea if it will fix
it.
If anyone encountered that before, I would really use some advice/help.
Thanks!


[DISCUSS] ReplayableSourceStateBackend

2020-09-23 Thread Theo Diefenthal
Hi there, 

I just had the idea of a "ReplayableSourceStateBackend". I opened up a JIRA 
issue where I described the idea about it [1]. 

I would love to hear your feedback: Do you think it is possible to implement (I 
am not sure if a pipeline can be fully reconstructed from the source elements 
when it e.g. comes to closing session windows or such)? Would it be useful for 
your usecases as well? 

Best regards 
Theo 


[1] https://issues.apache.org/jira/browse/FLINK-19382 



Re: Stateful Functions + ML model prediction

2020-09-23 Thread John Morrow
Thanks very much Igal - that sounds like a good solution!

I'm new to StateFun so I'll have to dig into it a bit more, but this sounds 
like a good direction.

Thanks again,
John.


From: Igal Shilman 
Sent: Wednesday 23 September 2020 09:06
To: John Morrow 
Cc: user 
Subject: Re: Stateful Functions + ML model prediction

Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:
Are stateful functions available to all Flink jobs within a cluster?

Yes, the remote functions are some logic exposed behind an HTTP endpoint, and 
Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the 
necessary context (any previous state for a key, and the message) to the HTTP 
request.
So practically speaking the same remote function can be contacted by different 
Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?

The way I would approach this is, I would consider moving the business rules 
and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new 
version of the remote function container, as they can be independy restarted 
(without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through an 
RichAsyncFunction, as StateFun, by default, invokes many remote functions in 
parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning model, or 
even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP 
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you would 
also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can 
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
mailto:johnniemor...@hotmail.com>> wrote:
Hi Flink Users,

I'm using Flink to process a stream of records containing a text field. The 
records are sourced from a message queue, enriched as they flow through the 
pipeline based on business rules and finally written to a database. We're using 
the Ververica platform so it's running on Kubernetes.

The initial business rules were straightforward, e.g. if field X contains a 
certain word then set field Y to a certain value. For the implementation I 
began by looking at 
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for 
inspiration. I ended up implementing a business rule as a Java class with a 
match-predicate & an action. The records enter the pipeline on a data stream 
which is joined with the rules in a broadcast stream and a ProcessFunction 
checks each record to see if it matches any rule predicates. If the record 
doesn't match any business rule predicates it continues on in the pipeline. If 
the record does match one or more business rule predicates it is sent to a side 
output with the list of business rules that it matched. The side output data 
stream goes through a RichAsyncFunction which loops through the matched rules 
and applies each one's action to the record. At the end, that enriched 
side-output record stream is unioned back with the non-enriched record stream. 
This all worked fine.

I have some new business rules which are more complicated and require sending 
the record's text field to different pre-trained NLP models for prediction, 
e.g. if a model predicts the text language is X then update field Y to that 
value, if another model predicts the sentiment is positive then set some other 
field to another value. I'm planning on using seldon-core to serve these 
pre-trained models, so they'll also be available in the k8s cluster.

I'm not sure about the best way to set up these model prediction calls in 
Flink. I could add in a new ProcessFunction in my pipeline before my existing 
enrichment-rule-predicate ProcessFunction and have it send the text to each of 
the prediction models and add the results for each one to the record so it's 
available for the enrichment step. The downside of this is that in the future 
I'm anticipating having more and more models, and not necessarily wanting to 
send each record to every model for prediction. e.g. I might have a business 
rule which says if the author of the text is X then get the sentiment (via the 
sentiment model) and update field Z, so it would be a waste of time doing that 
for all records.

I had a look at stateful functions. There's an example in the 
statefun.io overview which shows having a 

Fault tolerance: StickyAllocationAndLocalRecoveryTestJob

2020-09-23 Thread cokle
Hello members,
I am new to the Apache Flink word and in the last month, I have been
exploring the testing scenarios offered by Flink team and different books to
learn Flink.

Today I was trying to better understand this test that you can find it here:



 

I will try to explain how I understand it and maybe you can point out the
problems of my logic.

Testing parameters:
   delay = 1L;
   maxAttempts = 3;
   stateBackend = FsStateBackend


RandomLongSource:
Firstly we will create data source implementing the CheckpointedFunction
interface.  If the number of attempts is higher than the number of max
allowed attempts, we will emit the last event and shut down the source,
otherwise, we will continue emitting events.
 1.1/ Why we need the maxAttempts in this scenario? Is that the number of
times we allow the application to fail?/
initializeState method is called every time the user-defined function is
initialized, or be that when the function is actually recovering from an
earlier checkpoint. [1]



StateCreatingFlatMap:
After implementing the source, with the flat map operator, we are going to
generate failure scenarios and test how flink will handle situations. We are
going to kill TaskManagers using halt method if the PID corresponds with the
PID we decided to kill.
In the initialState method, we will handle how the recovery will be done and
if the state was previously restored we will capture the info regarding it.


This is my understanding of the testing source code, but I have not clear
how it will really work and if I am capturing the real scenario
demonstration correctly.
I decided to test it using 1 JobManager and 3 TaskManagers (even the max
operator parallelism is 1).

The application will start running and constantly will be checkpointed. In
some moments the task will be killed and the application will be restored to
the last saved checkpoint. If the application has 4 failures (more than
allowed attempts 3), than we will successfully finish the application. Is
that correct?

2.1 Is this how the logic of the scenario works?
2.2 Is this an example of fault tolerance using checkpoints?

I will upload the screenshots of UI dashboard and an exception that I don't
really understand, but in some forums, it read that it was a problem with
job manager heap size.


I ask sorry if my question is not well-formatted or if it sounds stupid.
Best regards


 

 

 


[1]

 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Statefun Byte Ingress

2020-09-23 Thread Igal Shilman
Hi,

For ingress, we don't look at the content at all, we put the bytes "as-is"
into the Any's value field, and we set the typeUrl field
with whatever was specified in the module.yaml.

See here for example:
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-k8s-example/module.yaml#L41

Hope that helps,
Igal.

On Wed, Sep 23, 2020 at 3:09 PM Timothy Bess  wrote:

> Hi Igal,
>
> Ah that definitely helps to know for Function -> Function invocations, but
> when doing Ingress via statefun how would that work? Is there a config I
> can set in the "module.yaml" to have it just pack arbitrary bytes into the
> Any?
>
> Thanks,
>
> Tim
>
> On Wed, Sep 23, 2020 at 7:01 AM Igal Shilman  wrote:
>
>> Hi Tim,
>>
>> You are correct, currently the argument to a remote function must be a
>> Protobuf Any, however StateFun doesn't interpret the contents of that Any,
>> and it would be passed as-is to the remote function.
>> As you mentioned in your email you can interpret the bytes as the bytes
>> of a JSON string.
>>
>> I hope it helps,
>> Igal.
>>
>> On Wed, Sep 23, 2020 at 5:06 AM Timothy Bess  wrote:
>>
>>> Hi,
>>>
>>> So most of the examples of "module.yaml" files I've seen focus on
>>> protobuf ingress, but is there a way to just get bytes from Kafka? I want
>>> to integrate this with the rest of my codebase which uses JSON, but don't
>>> want to migrate to protobuf just yet. I'm not totally sure how it would
>>> work since function arguments seem to be encoded as an Any type which is a
>>> protobuf type string + some bytes, I guess the string would need to be some
>>> made up constant value and I'd just grab the bytes? Honestly just using
>>> bytes like is done with the state value might be a bit more flexible to
>>> work with.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>


Re: Support for gRPC in Flink StateFun 2.x

2020-09-23 Thread Igal Shilman
Hi Dalmo,

Thanks a lot for sharing this use case!

If I understand the requirement correctly, you are mostly concerned with
performance. In that case I've created
an issue [1] to add a gRPC transport for StateFun, and I believe we would
be able to implement it in the upcoming weeks.

Just a side note about the way StateFun invokes remote functions via HTTP,
at the moment:

- StateFun keeps a connection pool, to avoid re-establishing the connection
for each request.
- StateFun batches requests per address (key) to amortize the cost of a
round trip, and state shipment.

There is an RC2 for the upcoming StateFun version, with some
improvements around HTTP functions,
and operational visibility (logs and metrics). So perhaps you can take that
for a spin if you are evaluating StateFun
at the moment. The release itself is expected to happen at the end of this
week.


[1] https://issues.apache.org/jira/browse/FLINK-19380

Thanks,
Igal.


On Tue, Sep 22, 2020 at 4:38 AM Dalmo Cirne  wrote:

> Thank you for the quick reply, Igal.
>
>
>
> Our use case is the following: A stream of data from Kafka is fed into
> Flink where data transformations take place. After that we send that
> transformed data to an inference engine to score the relevance of each
> record. (Rough simplification.)
>
>
>
> Doing that using HTTP endpoints is possible, and it is the solution we
> have in place today, however, for each request to that endpoint, we need to
> incur the cost of establishing the connection, etc., thus increasing the
> latency of the system.
>
>
>
> We do process data in batches to mitigate the latency, but it is not the
> same as having a bi-directional stream, as it would be possible using gRPC.
> Furthermore, we already use gRPC in other parts of our system.
>
>
>
> We also want to be able to scale those endpoints up and down, as demand
> for the service fluctuates depending on the hour and day. Combining
> StateFun and Kubernetes would allow for that elasticity of the service,
> while keeping state of the execution, since inferences are not always just
> one endpoint, but a collection of them where the output of one becomes the
> input of the next, culminating with the predicted score(s).
>
>
>
> We are evaluating StateFun because Flink is already part of the
> infrastructure. With that said, gRPC is also part of our requirements, thus
> motivation for the question.
>
>
>
> I’d love to hear more about plans to implement support for gRPC and
> perhaps become an early adopter.
>
>
>
> I hope this helps with understanding of the use case. Happy to talk
> further and answer more questions.
>
>
>
> Best,
>
>
>
> Dalmo
>
>
>
>
>
>
>
> *From: *Igal Shilman 
> *Date: *Saturday, September 19, 2020 at 01:41
> *To: *Dalmo Cirne 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Support for gRPC in Flink StateFun 2.x
>
>
>
> Hi,
>
>
>
> Your observation is correct, currently the only way to invoke a remote
> function is trough an HTTP POST request to a service that exposes a
> StateFun endpoint.
>
>
>
> The endpoint must implement the client side of a the “RequestReply”
> protocol as defined by StateFun (basically an invocation contains the state
> and message, and a response contains a description of the side effects).
>
>
>
> While gRPC can be easily added a as a replacement for the transport layer,
> the client side (the remote function) would still have to implement the
> RequestReply protocol.
>
>
>
> To truly utilize gRPC we would want to introduce a new type of protocol,
> that can exploit the low latency bi-directional streams to and from the
> function.
>
>
>
> While for the later it is a bit difficult to commit for a specific date
> the former can be easily implemented in the next StateFun release.
>
>
>
> Would you be able to share with us a little bit more about your original
> motivation to ask this question :-)
>
> This would help us as we gather more and more use cases.
>
>
>
> For example: target language, environment, how gRPC services are being
> discovered.
>
>
>
> Thanks,
>
> Igal
>
>
>
> On Thursday, September 17, 2020, Dalmo Cirne 
> wrote:
>
> Hi,
>
>
>
> In the latest Flink Forward, from April 2020, there were mentions that
> adding support to gRPC, in addition to HTTP, was in the works and would be
> implemented in the future.
>
>
>
> Looking into the flink-statefun
> 
> repository on GitHub, one can see that there is already some work done with
> gRPC, but parity with its HTTP counterpart is not there, yet.
>
>
>
> Is there a roadmap or an estimate of when gRPC will be implemented in
> StateFun?
>
>
>
> Thank you,
>
>
>
> Dalmo
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 Thread Tianwang Li
使用的是 `RocksDBStateBackend`, 是什么超用了内存, 配置了“taskmanager.memory.process.size:
4g”,
并且有预留 1G 用于jvm-overhead。
现在超了2.8G,是什么超用的,我想了解一下。
如果控制不了,很容易被资源系统(yarn、k8s等) kill 了。


有没有,其他人有这方面的经验。



Benchao Li  于2020年9月23日周三 下午1:12写道:

> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
>
> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>
> >  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> > KILL 。
> > 单流跑的话,比较正常。
> > JOB的内存是4G。版本1.11.1
> > --
> > 发件人:Benchao Li 
> > 发送时间:2020年9月23日(星期三) 10:50
> > 收件人:user-zh 
> > 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
> >
> > Hi Tianwang,
> >
> > 不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
> >
> > 1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
> > join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
> > `Math.max(leftRelativeSize, rightRelativeSize) +
> > allowedLateness`,根据你的SQL,这个值应该是6h
> > 2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
> > 清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> > 2;`,在你的SQL来讲,就是3h,也就是说
> > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
> >
> > 希望这个可以解答你的疑惑~
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-18996
> >
> > Tianwang Li  于2020年9月22日周二 下午8:26写道:
> >
> > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> > >
> > >
> > > 【join】
> > >
> > > > SELECT `b`.`rowtime`,
> > > > `a`.`c_id`,
> > > > `b`.`openid`
> > > > FROM `test_table_a` AS `a`
> > > > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> > > > AND `a`.`openid` = `b`.`openid`
> > > > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime` - INTERVAL '0'
> > SECOND
> > > > AND `a`.`rowtime` + INTERVAL '6' HOUR
> > > >
> > > >
> > > 【window】
> > >
> > > > SELECT HOP_ROWTIME(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR)
> > AS
> > > > `rowtime`,
> > > > HOP_START(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__windoow_start__`,
> > > > HOP_END(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR) AS
> > > > `__window_end__`,
> > > > `c_id`,
> > > > COUNT(`openid`) AS `cnt`
> > > > FROM `test_table_in_6h`
> > > > GROUP BY HOP(`rowtime`, INTERVAL '5' MINUTE, INTERVAL '6' HOUR),
> > > > `c_id`
> > > >
> > >
> > >
> > > 我配置了Fink的内存是4G, 实际使用达到了6.8 G。
> > > 同样的逻辑,我使用Stream API实现,使用的内存只有4.5G左右
> > >
> > > 【配置】
> > >
> > > > cat conf/flink-conf.yaml
> > > > jobmanager.rpc.address: flink-jobmanager
> > > > taskmanager.numberOfTaskSlots: 1
> > > > blob.server.port: 6124
> > > > jobmanager.rpc.port: 6123
> > > > taskmanager.rpc.port: 6122
> > > > jobmanager.heap.size: 6144m
> > > > taskmanager.memory.process.size: 4g
> > > > taskmanager.memory.jvm-overhead.min: 1024m
> > > > taskmanager.memory.jvm-overhead.max: 2048m
> > > > taskmanager.debug.memory.log-interval: 1
> > > > env.java.opts: "-Xloggc:/opt/flink/log/gc.log
> > > > -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCDetails
> > > > -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation
> > > -XX:NumberOfGCLogFiles=10
> > > > -XX:GCLogFileSize=10M -XX:+PrintPromotionFailure -XX:+PrintGCCause"
> > > >
> > >
> > >
> > >
> > > --
> > > **
> > >  tivanli
> > > **
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
> >
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


Re: Flink Statefun Byte Ingress

2020-09-23 Thread Timothy Bess
Hi Igal,

Ah that definitely helps to know for Function -> Function invocations, but
when doing Ingress via statefun how would that work? Is there a config I
can set in the "module.yaml" to have it just pack arbitrary bytes into the
Any?

Thanks,

Tim

On Wed, Sep 23, 2020 at 7:01 AM Igal Shilman  wrote:

> Hi Tim,
>
> You are correct, currently the argument to a remote function must be a
> Protobuf Any, however StateFun doesn't interpret the contents of that Any,
> and it would be passed as-is to the remote function.
> As you mentioned in your email you can interpret the bytes as the bytes of
> a JSON string.
>
> I hope it helps,
> Igal.
>
> On Wed, Sep 23, 2020 at 5:06 AM Timothy Bess  wrote:
>
>> Hi,
>>
>> So most of the examples of "module.yaml" files I've seen focus on
>> protobuf ingress, but is there a way to just get bytes from Kafka? I want
>> to integrate this with the rest of my codebase which uses JSON, but don't
>> want to migrate to protobuf just yet. I'm not totally sure how it would
>> work since function arguments seem to be encoded as an Any type which is a
>> protobuf type string + some bytes, I guess the string would need to be some
>> made up constant value and I'd just grab the bytes? Honestly just using
>> bytes like is done with the state value might be a bit more flexible to
>> work with.
>>
>> Thanks,
>>
>> Tim
>>
>


flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 Thread chenxuying
flinksql 版本是1.11.2 
source接收到字段是字符串类型的时间
CREATE TABLE sourceTable (
 `time` STRING
 ) WITH(
...
 );


sink如下
CREATE TABLE sinktable (
`time1` STRING,
`time` TIMESTAMP(3)
 ) WITH (
 'connector' = 'print'
 );


insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
 insert into sinktable select 
`time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable


报错说是format错误
Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
at 
java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
at 
java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)

Re: Stateful Functions + ML model prediction

2020-09-23 Thread Igal Shilman
Hi John,

Thank you for sharing your interesting use case!

Let me start from your second question:

> Are stateful functions available to all Flink jobs within a cluster?


Yes, the remote functions are some logic exposed behind an HTTP endpoint,
and Flink would forward any message addressed to them via an HTTP request.
The way StateFun works is, for every invocation, StateFun would attach the
necessary context (any previous state for a key, and the message) to the
HTTP request.
So practically speaking the same remote function can be contacted by
different Jobs, as the remote functions are effectively stateless.

 Does this sound like a good use case for stateful functions?


The way I would approach this is, I would consider moving the
business rules and the enrichment to the remote function.
This would:

a) Eliminate the need for a broadcast stream, you can simply deploy a new
version of the remote function container, as they can be independy
restarted (without the need to restart the Flink job that contacts them)
b) You can perform the enrichment immediately without going through
an RichAsyncFunction, as StateFun, by default, invokes many remote
functions in parallel (but never for the same key)
c) You can contact the remote service that hosts the machine learning
model, or even load the model in the remote function's process on startup.

So, in kubernetes terms:

1. You would need a set of pods (a deployment) that are able to serve HTTP
traffic and expose a StateFun endpoint.
2. You would need a separate deployment for Flink that runs a StateFun job
3. The StateFun job would need to know how to contact these pods, so you
would also need a kubernetes service (or a LoadBalancer) that
balances the requests from (2) to (1).

If you need to change your business rules, or the enrichment logic you can
simply roll a new version of (1).


Good luck,
Igal.

On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
wrote:

> Hi Flink Users,
>
> I'm using Flink to process a stream of records containing a text field.
> The records are sourced from a message queue, enriched as they flow through
> the pipeline based on business rules and finally written to a database.
> We're using the Ververica platform so it's running on Kubernetes.
>
> The initial business rules were straightforward, e.g. if field X contains
> a certain word then set field Y to a certain value. For the implementation
> I began by looking at
> https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for
> inspiration. I ended up implementing a business rule as a Java class with a
> match-predicate & an action. The records enter the pipeline on a data
> stream which is joined with the rules in a broadcast stream and a
> ProcessFunction checks each record to see if it matches any rule
> predicates. If the record doesn't match any business rule predicates it
> continues on in the pipeline. If the record does match one or more business
> rule predicates it is sent to a side output with the list of business rules
> that it matched. The side output data stream goes through a
> RichAsyncFunction which loops through the matched rules and applies each
> one's action to the record. At the end, that enriched side-output record
> stream is unioned back with the non-enriched record stream. This all worked
> fine.
>
> I have some new business rules which are more complicated and require
> sending the record's text field to different pre-trained NLP models for
> prediction, e.g. if a model predicts the text language is X then update
> field Y to that value, if another model predicts the sentiment is positive
> then set some other field to another value. I'm planning on using
> seldon-core to serve these pre-trained models, so they'll also be available
> in the k8s cluster.
>
> I'm not sure about the best way to set up these model prediction calls in
> Flink. I could add in a new ProcessFunction in my pipeline before my
> existing enrichment-rule-predicate ProcessFunction and have it send the
> text to each of the prediction models and add the results for each one to
> the record so it's available for the enrichment step. The downside of this
> is that in the future I'm anticipating having more and more models, and not
> necessarily wanting to send each record to every model for prediction. e.g.
> I might have a business rule which says if the author of the text is X then
> get the sentiment (via the sentiment model) and update field Z, so it would
> be a waste of time doing that for all records.
>
> I had a look at stateful functions. There's an example in the statefun.io
> overview which shows having a stateful function for doing a fraud model
> prediction based on if an account has had X number of frauds detected in
> the last 30 days, so the key for the state is an account number. In my
> case, these model predictions don't really have any state - they just take
> input and return a prediction, they're more like a stateless lambda
> function. Also, I was 

Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-23 Thread Claude M
It was mentioned that this issue may be fixed in 1.10.3 but there is no
1.10.3 docker image here: https://hub.docker.com/_/flink


On Wed, Sep 23, 2020 at 7:14 AM Claude M  wrote:

> In regards to the metaspace memory issue, I was able to get a heap dump
> and the following is the output:
>
> Problem Suspect 1
> One instance of *"java.lang.ref.Finalizer"* loaded by *" loader>"* occupies *4,112,624 (11.67%)* bytes. The instance is referenced
> by *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *""*.
> The memory is accumulated in one instance of *"java.lang.Object[]"* loaded
> by *""*.
>
> Problem Suspect 2
> 33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*, loaded by
>  *"sun.misc.Launcher$AppClassLoader @ 0xb4068680"* occupy *6,615,416
> (18.76%)*bytes.
>
> Based on this, I'm not clear on what needs to be done to solve this.
>
>
> On Tue, Sep 22, 2020 at 3:10 PM Claude M  wrote:
>
>> Thanks for your responses.
>> 1.  There were no job re-starts prior to the metaspace OEM.
>> 2.  I tried increasing the CPU request and still encountered the
>> problem.  Any configuration change I make to the job manager, whether it's
>> in the flink-conf.yaml or increasing the pod's CPU/memory request, results
>> with this problem.
>>
>>
>> On Tue, Sep 22, 2020 at 12:04 AM Xintong Song 
>> wrote:
>>
>>> Thanks for the input, Brain.
>>>
>>> This looks like what we are looking for. The issue is fixed in 1.10.3,
>>> which also matches this problem occurred in 1.10.2.
>>>
>>> Maybe Claude can further confirm it.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:
>>>
 Hi Xintong and Claude,



 In our internal tests, we also encounter these two issues and we spent
 much time debugging them. There are two points I need to confirm if we
 share the same problem.

1. Your job is using default restart strategy, which is per-second
restart.
2. Your CPU resource on jobmanager might be small



 Here is some findings I want to share.

 ## Metaspace OOM

 Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we
 have some job restarts, there will be some threads from the sourceFunction
 hanging, cause the class loader cannot close. New restarts would load new
 classes, then expand the metaspace, and finally OOM happens.



 ## Leader retrieving

 Constant restarts may be heavy for jobmanager, if JM CPU resources are
 not enough, the thread for leader retrieving may be stuck.



 Best Regards,

 Brian



 *From:* Xintong Song 
 *Sent:* Tuesday, September 22, 2020 10:16
 *To:* Claude M; user
 *Subject:* Re: metaspace out-of-memory & error while retrieving the
 leader gateway



 ## Metaspace OOM

 As the error message already suggested, the metaspace OOM you
 encountered is likely caused by a class loading leak. I think you are on
 the right direction trying to look into the heap dump and find out where
 the leak comes from. IIUC, after removing the ZK folder, you are now able
 to run Flink with the heap dump options.



 The problem does not occur in previous versions because Flink starts to
 set the metaspace limit since the 1.10 release. The class loading leak
 might have already been there, but is never discovered. This could lead to
 unpredictable stability and performance issues. That's why Flink updated
 its memory model and explicitly set the metaspace limit in the 1.10 
 release.



 ## Leader retrieving

 The command looks good to me. If this problem happens only once, it
 could be irrelevant to adding the options. If that does not block you from
 getting the heap dump, we can look into it later.


 Thank you~

 Xintong Song





 On Mon, Sep 21, 2020 at 9:37 PM Claude M 
 wrote:

 Hi Xintong,



 Thanks for your reply.  Here is the command output w/ the java.opts:



 /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
 -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
 -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
 -classpath
 /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
 --configDir /opt/flink/conf --executionMode cluster



 To answer your questions:

 

Re: 编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 Thread zilong xiao
Hi Natasha,
在mvn命令中加上这两个参数试试看 -Dscala-2.12 -Pscala-2.12

Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:

> Hi All,
> 很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
> 当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true
> -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
> 报错“Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile
> (scala-test-compile) on project flink-runtime_2.10”,
>
> 我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!
>
>
>
>
> Best,
> Natasha
>
>
> | |
> Natasha
> |
> |
> |
> 签名由网易邮箱大师定制


Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-23 Thread Claude M
In regards to the metaspace memory issue, I was able to get a heap dump and
the following is the output:

Problem Suspect 1
One instance of *"java.lang.ref.Finalizer"* loaded by *""* occupies *4,112,624 (11.67%)* bytes. The instance is referenced by
 *sun.misc.Cleaner @ 0xb5d6b520* , loaded by *""*. The
memory is accumulated in one instance of *"java.lang.Object[]"* loaded
by *""*.

Problem Suspect 2
33 instances of *"org.apache.flink.util.ChildFirstClassLoader"*,
loaded by *"sun.misc.Launcher$AppClassLoader
@ 0xb4068680"* occupy *6,615,416 (18.76%)*bytes.

Based on this, I'm not clear on what needs to be done to solve this.


On Tue, Sep 22, 2020 at 3:10 PM Claude M  wrote:

> Thanks for your responses.
> 1.  There were no job re-starts prior to the metaspace OEM.
> 2.  I tried increasing the CPU request and still encountered the problem.
> Any configuration change I make to the job manager, whether it's in the
> flink-conf.yaml or increasing the pod's CPU/memory request, results
> with this problem.
>
>
> On Tue, Sep 22, 2020 at 12:04 AM Xintong Song 
> wrote:
>
>> Thanks for the input, Brain.
>>
>> This looks like what we are looking for. The issue is fixed in 1.10.3,
>> which also matches this problem occurred in 1.10.2.
>>
>> Maybe Claude can further confirm it.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Sep 22, 2020 at 10:57 AM Zhou, Brian  wrote:
>>
>>> Hi Xintong and Claude,
>>>
>>>
>>>
>>> In our internal tests, we also encounter these two issues and we spent
>>> much time debugging them. There are two points I need to confirm if we
>>> share the same problem.
>>>
>>>1. Your job is using default restart strategy, which is per-second
>>>restart.
>>>2. Your CPU resource on jobmanager might be small
>>>
>>>
>>>
>>> Here is some findings I want to share.
>>>
>>> ## Metaspace OOM
>>>
>>> Due to https://issues.apache.org/jira/browse/FLINK-15467 , when we have
>>> some job restarts, there will be some threads from the sourceFunction
>>> hanging, cause the class loader cannot close. New restarts would load new
>>> classes, then expand the metaspace, and finally OOM happens.
>>>
>>>
>>>
>>> ## Leader retrieving
>>>
>>> Constant restarts may be heavy for jobmanager, if JM CPU resources are
>>> not enough, the thread for leader retrieving may be stuck.
>>>
>>>
>>>
>>> Best Regards,
>>>
>>> Brian
>>>
>>>
>>>
>>> *From:* Xintong Song 
>>> *Sent:* Tuesday, September 22, 2020 10:16
>>> *To:* Claude M; user
>>> *Subject:* Re: metaspace out-of-memory & error while retrieving the
>>> leader gateway
>>>
>>>
>>>
>>> ## Metaspace OOM
>>>
>>> As the error message already suggested, the metaspace OOM you
>>> encountered is likely caused by a class loading leak. I think you are on
>>> the right direction trying to look into the heap dump and find out where
>>> the leak comes from. IIUC, after removing the ZK folder, you are now able
>>> to run Flink with the heap dump options.
>>>
>>>
>>>
>>> The problem does not occur in previous versions because Flink starts to
>>> set the metaspace limit since the 1.10 release. The class loading leak
>>> might have already been there, but is never discovered. This could lead to
>>> unpredictable stability and performance issues. That's why Flink updated
>>> its memory model and explicitly set the metaspace limit in the 1.10 release.
>>>
>>>
>>>
>>> ## Leader retrieving
>>>
>>> The command looks good to me. If this problem happens only once, it
>>> could be irrelevant to adding the options. If that does not block you from
>>> getting the heap dump, we can look into it later.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Sep 21, 2020 at 9:37 PM Claude M  wrote:
>>>
>>> Hi Xintong,
>>>
>>>
>>>
>>> Thanks for your reply.  Here is the command output w/ the java.opts:
>>>
>>>
>>>
>>> /usr/local/openjdk-8/bin/java -Xms768m -Xmx768m -XX:+UseG1GC
>>> -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/flink/log
>>> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
>>> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
>>> -classpath
>>> /opt/flink/lib/flink-metrics-datadog-statsd-2.11-0.1.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar:/opt/flink/lib/flink-table-blink_2.11-1.10.2.jar:/opt/flink/lib/flink-table_2.11-1.10.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.15.jar:/opt/flink/lib/flink-dist_2.11-1.10.2.jar::/etc/hadoop/conf:
>>> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
>>> --configDir /opt/flink/conf --executionMode cluster
>>>
>>>
>>>
>>> To answer your questions:
>>>
>>>- Correct, in order for the pod to start up, I have to remove the
>>>flink app folder from zookeeper.  I only have to delete once after 
>>> applying
>>>the java.opts arguments.  It doesn't make sense though that I should have
>>>to do this just from adding a parameter.
>>>- I'm using the standalone deployment.
>>>- I'm using job 

Ignoring invalid values in KafkaSerializationSchema

2020-09-23 Thread Yuval Itzchakov
Hi,

I'm using a custom KafkaSerializationSchema to write records to Kafka using
FlinkKafkaProducer. The objects written are Rows coming from Flink's SQL
API.

In some cases, when trying to convert the Row object to a byte[],
serialization will fail due to malformed values. In such cases, I would
like the custom serialization schema to drop the bad records and not send
them through.

>From the API, it is unclear how such failures should be handled. Given the
following signature:

 ProducerRecord serialize(T element, @Nullable Long
timestamp);

>From reading the code, there's no exception handling or null checking,
which means that:

- If an exception is thrown, it will cause the entire job to fail (this has
happened to me in production)
- If null is passed, a null value will be pushed down to kafkaProducer.send
which is undesirable.

What are the options here?



-- 
Best Regards,
Yuval Itzchakov.


Re:查询hbase sink结果表,有时查到数据,有时查不到

2020-09-23 Thread izual
hbase写入时会有buffer [1],按照时间或者数据量写入 [2],可以看下是不是调整过?



1. 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
2. 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hbase.html
 







在 2020-09-23 17:55:44,"kandy.wang"  写道:
>insert into hive.temp_dw.day_order_index select rowkey, ROW(orderN,)
>from
>(
>select order_date as rowkey,
>count(distinct parent_sn) as orderN,
>
>group by order_date
>)
>
>
>通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete 
>hbase的某条rowkey数据,导致客户端查不到数据?
>我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?


Flink Statefun Byte Ingress

2020-09-23 Thread Igal Shilman
Hi Tim,

You are correct, currently the argument to a remote function must be a
Protobuf Any, however StateFun doesn't interpret the contents of that Any,
and it would be passed as-is to the remote function.
As you mentioned in your email you can interpret the bytes as the bytes of
a JSON string.

I hope it helps,
Igal.

On Wed, Sep 23, 2020 at 5:06 AM Timothy Bess  wrote:

> Hi,
>
> So most of the examples of "module.yaml" files I've seen focus on protobuf
> ingress, but is there a way to just get bytes from Kafka? I want to
> integrate this with the rest of my codebase which uses JSON, but don't want
> to migrate to protobuf just yet. I'm not totally sure how it would work
> since function arguments seem to be encoded as an Any type which is a
> protobuf type string + some bytes, I guess the string would need to be some
> made up constant value and I'd just grab the bytes? Honestly just using
> bytes like is done with the state value might be a bit more flexible to
> work with.
>
> Thanks,
>
> Tim
>


kafka增加字段,hive表如何处理

2020-09-23 Thread china_tao
flink1.11.1,flink sql,已经实现flink sql 读取kafka,存储到hive。现在的问题是,kafka源增加字段了,flink
sql中的hive如何修改。直接在hive中增加字段的话,每次启动,会报 hive表已经存在,如果drop table if
exists的话,历史数据就会丢。请问大家是如何处理的,谢谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Efficiently processing sparse events in a time windows

2020-09-23 Thread Steven Murdoch
Hello,

I am trying to do something that seems like it should be quite simple but I 
haven’t found an efficient way to do this with Flink and I expect I’m missing 
something obvious here. 

The task is that I would like to process a sequence of events when a certain 
number appear within a keyed event-time window. There will be many keys but 
events within each keyed window will normally be quite sparse. 

My first guess was to use Flink’s sliding windowing functionality. However my 
concern is that events are duplicated for each window. I would like to be 
precise about timing so every event would trigger hundreds of copies of an 
event in hundreds of windows, most which are then discarded because there are 
insufficient events. 

My next guess was to use a process function, and maintain a queue of events as 
the state. When an event occurred I would add it to the queue and then remove 
any events which fell off the end of my window. I thought ListState would help 
here, but that appears to not allow items to be removed.

I then thought about using a ValueState with some queue data structure. However 
my understanding is that changes to a ValueState result in the entire object 
being copied and so would be quite inefficient and best avoided. 

Finally I thought about trying to just maintain a series of timers – 
incrementing on an event and decrementing on its expiry. However I then hit the 
problem of timer coalescing. If an event occurs at the same time as its 
predecessor, the timer will not get set so the counter will get incremented but 
never decremented. 

What I’m doing seems like it would be a common task but none of the options 
look good, so I feel I’m missing something. Could anyone offer some advice on 
how to handle this case?

Thanks in advance. 

Best wishes,
Steven


查询hbase sink结果表,有时查到数据,有时查不到

2020-09-23 Thread kandy.wang
insert into hive.temp_dw.day_order_index select rowkey, ROW(orderN,)
from
(
select order_date as rowkey,
count(distinct parent_sn) as orderN,

group by order_date
)


通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete 
hbase的某条rowkey数据,导致客户端查不到数据?
我理解 hbase sink 应该是upsert数据吧。会不会先delete 再put 导致这样的现象 ?

Re: Better way to share large data across task managers

2020-09-23 Thread Dongwon Kim
Hi Kostas,

Thanks for the input!

BTW, I guess you assume that the broadcasting occurs just once for
bootstrapping, huh?
My job needs not only bootstrapping but also periodically fetching a
new version of data from some external storage.

Thanks,

Dongwon

> 2020. 9. 23. 오전 4:59, Kostas Kloudas  작성:
>
> Hi Dongwon,





>
> If you know the data in advance, you can always use the Yarn options
> in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> the data you want only once to each Yarn container (i.e. TM) and then
> write a udf which reads them in the open() method. This will allow the
> data to be shipped only once per TM but then each of the tasks will
> have its own copy in memory of course. By default the visibility of
> the files that you ship is set to APPLICATION [2], if I am not
> mistaken so if more than one TMs go to the same node, then you will
> have even less copies shipped.
>
> Does this help with your usecase?
>
> Cheers,
> Kostas
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> [2] 
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
>
>> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim  wrote:
>> Hi,
>> I'm using Flink broadcast state similar to what Fabian explained in [1]. One 
>> difference might be the size of the broadcasted data; the size is around 
>> 150MB.
>> I've launched 32 TMs by setting
>> - taskmanager.numberOfTaskSlots : 6
>> - parallelism of the non-broadcast side : 192
>> Here's some questions:
>> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it right?
>> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
>> each TM can read the broadcasted data? I'm considering implementing a static 
>> class for the non-broadcast side to directly load data only once on each 
>> TaskManager instead of the broadcast state (FYI, I'm using per-job clusters 
>> on YARN, so each TM is only for a single job). However, I'd like to use 
>> Flink native facilities if possible.
>> The type of broadcasted data is Map with around 600K entries, so 
>> every time the data is broadcasted a lot of GC is inevitable on each TM due 
>> to the (de)serialization cost.
>> Any advice would be much appreciated.
>> Best,
>> Dongwon
>> [1] https://flink.apache.org/2019/06/26/broadcast-state.html


Flink Kerberos认证问题

2020-09-23 Thread zhangjunj
您好:
  因为业务需要,需要Flink连接CDK(带有kerberos环境下的Kafka Topic)。
  同一集群,Flink on Yarn模式,在kerberos环境下申请yarn-session资源通过:yarn-session.sh 
-n 2 -d -jm 2048 -tm 4096 -qu root.__ -D 
security.kerberos.login.keytab=AAA.keytab -D 
security.kerberos.login.principal=AAA, 申请的资源去连接同一集群的CDK,在代码中添加了认证信息:
System.setProperty("java.security.krb5.conf", krb5);
System.setProperty("java.security.auth.login.config", jaas);
System.setProperty("sun.security.krb5.debug", "true");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", 
prop.getProperty("KafkaBrokers"));
properties.setProperty("group.id", 
prop.getProperty("GroupId"));


 同一集群, 如果申请的yarn-session的认证用户 和  kafka的认证用户是同一个,比如说都是AAA_AAA_AAA.keytab 
,那就能成功。
 同一集群, 如果申请的yarn-session的认证用户 和  
kafka的认证用户是不同用户,比如说都是申请的yarn-session是AAA.keytab,而认证的kafka是BBB.keytab 
,那就是失败的。报错:errors.TopicAuthorizationException: Not authorized to access 
topics[].  认证不通过。  麻烦问一下这个问题该怎么解决。我在google、百度都没有这方面的资料,然后自己尝试了各种方法,都不行。




感谢感谢!







flink sql grouping sets语义中NOT NULL不生效

2020-09-23 Thread kandy.wang
sql如下:
select
   (case when act_name is not null then  act_name else 'default_value' 
end) as act_name,
   (case when fst_plat is not null then  fst_plat  else 'default_value' 
end) as fst_plat,
sum(amount) as saleN
from  hive.temp_dw.view_trad_order_goods_source_act_last_value
group by  grouping sets((act_name),(act_name,fst_plat)


 而hive.temp_dw.view_trad_order_goods_source_act_last_value 是一个 view,view的结构如下:
Flink SQL> desc hive.temp_dw.view_trad_order_goods_source_act_last_value
 |-- act_name: STRING
 |-- fst_plat: STRING NOT NULL   
  ..
 其中fst_plat 数据类型为:STRING NOT NULL ,在grouping 
sets的(act_name)条件分支时,fst_plat不参与group,
 实际测下来发现 (case when fst_plat is not null then  fst_plat  else 'default_value' 
end) as fst_plat 输出的结果都没走到 else的default_value
 感觉 fst_plat is not null 不生效。 感觉是和fst_plat: STRING NOT NULL  
这个条件有关系。有这个条件限制时,就破坏了NOT NULL的语义。

Re: flink sql延迟数据

2020-09-23 Thread Benchao Li
你是用的Blink planner的TUMBLE window么,如果是的话,可以通过设置state retention[1]时间来处理late数据的。
具体的allow lateness的时间就是你设置的min retention time

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

ang <806040...@qq.com> 于2020年9月23日周三 下午4:24写道:

> hi各位,有个问题请教一下:
> 我现在使用flink sql统计一下kafka中在某个时间窗口内指定字段出现的次数,使用event
> time,需要在5s内输出结果,但是数据会有一些延迟,可能大于5s,目前设置waterwark为
> WATERMARK FOR ts AS ts - INTERVAL '5' SECODND
> ,但是这样延迟大于5s的数据就会被丢弃掉,请问下其他延迟的数据有没有什么办法进行处理?我看datastream api里面可以使用allowed
> lateness,但是这部分在sql中没看到有相关语法
>
>
> Flink版本1.10.1
> 



-- 

Best,
Benchao Li


Re: flink pb转json性能问题

2020-09-23 Thread Benchao Li
Hi kandy,

关于第1个问题,目前社区有计划做一个内置的pb format[1],可能大概率赶不上1.12了,不过应该1.13差不多。

[1] https://issues.apache.org/jira/browse/FLINK-18202

kandy.wang  于2020年9月23日周三 下午4:55写道:

> 因flink目前不支持pb format,调用了,protobuf-java-util
> com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message)
> 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json,
> 发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
> 想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
> 、社区对pb format 会支持么?
> 3、pb转json 有什么性能比较好的工具包



-- 

Best,
Benchao Li


flink pb转json性能问题

2020-09-23 Thread kandy.wang
因flink目前不支持pb format,调用了,protobuf-java-util 
com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message)
 先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json, 
发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
、社区对pb format 会支持么?
3、pb转json 有什么性能比较好的工具包

Re: 编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 Thread tison
从日志看你的 scala 是 2.10 版本的,比较新版本的 flink 应该都只支持 2.11 和 2.12

Best,
tison.


Natasha <13631230...@163.com> 于2020年9月23日周三 下午4:00写道:

> Hi All,
> 很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
> 当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true
> -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
> 报错“Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile
> (scala-test-compile) on project flink-runtime_2.10”,
>
> 我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!
>
>
>
>
> Best,
> Natasha
>
>
> | |
> Natasha
> |
> |
> |
> 签名由网易邮箱大师定制


Re: How to disconnect taskmanager via rest api?

2020-09-23 Thread Luan Cooper
thanks
I'll create a new issue for this feature on github


On Mon, Sep 21, 2020 at 11:51 PM Timo Walther  wrote:

> Hi Luan,
>
> this sound more of a new feature request to me. Maybe you can already
> open an issue for it.
>
> I will loop in Chesnay in CC if there is some possibility to achieve
> this already?
>
> Regards,
> Timo
>
> On 21.09.20 06:37, Luan Cooper wrote:
> > Hi
> >
> > We're running flink standalone cluster on k8s
> > when deleting a taskmanager pod manually, jobmanager *should disconnect
> > it immediately*
> >
> > however no such rest api available right now
> > we have to wait `akka.tcp.timeout` which means 30s later or more
> >
> > What if I want to disconnect tm via rest api
> > Which way did you suggest ?
> >
> > 1. add disconnectTaskManager to
> > org.apache.flink.runtime.dispatcher.Dispatcher
> > which means a new Interface
> >
> > CompletableFuturedisconnectTaskManager(JobID jobId,
> ResourceID resourceId);
> >
> > in org.apache.flink.runtime.webmonitor.RestfulGateway
> >
> > 2. Any other suggestions?
> >
> > Thanks
>
>


flink sql????????

2020-09-23 Thread ang
hi
??flink sqlkafka??event 
time5s??5s??waterwark??
WATERMARK FOR ts AS ts - INTERVAL '5' SECODND 
??5sdatastream
 apiallowed lateness??sql??


Flink1.10.1


Re: Calcite在嵌套多层包含where条件的sql语句时优化器OOM

2020-09-23 Thread jun su
hi danny & godfrey

看debug日志99%是CalcMergeRule , 我看blink用的是FlinkCalcMergeRule ,
在matches方法里加了些对none-deterministic表达式的过滤,,
于是我将CalcMergeRule替换成FlinkCalcMergeRule, 并在FlinkRuleSets里做了更新 ,
重跑后debug日志是99%是更新过的FlinkCalcMergeRule

Danny Chan  于2020年9月23日周三 下午12:32写道:

> 应该是碰到节点 cycle 引用了,导致优化 rule 一直重复重复触发,可以将 debug 日志打开,看下是哪个 rule
> 被频繁触发了,之前修过一个类似的问题[1],可以参考下
>
> [1] https://issues.apache.org/jira/browse/CALCITE-3121
>
> Best,
> Danny Chan
> 在 2020年9月23日 +0800 AM10:23,jun su ,写道:
> > hi godfrey,
> > 方便说下是哪些rule fix了这个问题么? 我对这个比较好奇 , 想看下是什么原因导致的
> >
> > godfrey he  于2020年9月23日周三 上午10:09写道:
> >
> > > Hi Jun,
> > >
> > > 可能是old planner缺少一些rule导致遇到了corner case,
> > > blink planner之前解过一些类似的案例。
> > >
> > > jun su  于2020年9月23日周三 上午9:53写道:
> > >
> > > > hi godfrey,
> > > >
> > > > 刚看了下, blink应该也会用hep , 上文说错了
> > > >
> > > > jun su  于2020年9月23日周三 上午9:19写道:
> > > >
> > > > > hi godfrey,
> > > > > 我用了最新代码的blink没这个问题, 我看代码flink是先用hep然后进valcano, 而blink貌似没用hep,
> > > > > 我将hep代码注释后valcano的迭代次数会大幅减少, 语句嵌套10层基本在4000次左右能获取最佳方案,我再debug看下原因
> > > > >
> > > > > godfrey he  于2020年9月22日周二 下午8:58写道:
> > > > >
> > > > > > blink planner 有这个问题吗?
> > > > > >
> > > > > > jun su  于2020年9月22日周二 下午3:27写道:
> > > > > >
> > > > > > > hi all,
> > > > > > >
> > > > > > > 环境: flink-1.9.2 flink table planner
> > > > > > > 现象: 代码一直在 VolcanoPlanner.findBestExp()方法中出不来, 直到OOM
> > > > > > >
> > > > > > > 发现在嵌套4层时 findBestExp方法中while(true)会循环3w多次后成功退出, 嵌套5层会达到几十万级别,
> > > > 导致进程OOM
> > > > > > > ---
> > > > > > > 代码:
> > > > > > >
> > > > > > > fbTableEnv.registerTableSource("source",orcTableSource)
> > > > > > >
> > > > > > > val select = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> source
> > > ")
> > > > > > >
> > > > > > > fbTableEnv.registerTable("selectTable",select)
> > > > > > >
> > > > > > > val t1 = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from
> > > > selectTable
> > > > > > > where Auth_Roles like 'a%'")
> > > > > > > fbTableEnv.registerTable("t1",t1)
> > > > > > >
> > > > > > > val t2 = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t1
> where
> > > > > > > Target_UserSid= 'b'")
> > > > > > > fbTableEnv.registerTable("t2",t2)
> > > > > > >
> > > > > > > val t3 = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t2
> where
> > > > > > > Thread_ID= 'c'")
> > > > > > > fbTableEnv.registerTable("t3",t3)
> > > > > > >
> > > > > > > val t4 = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t3
> where
> > > > > > > access_path= 'd'")
> > > > > > > fbTableEnv.registerTable("t4",t4)
> > > > > > >
> > > > > > > val t5 = fbTableEnv.sqlQuery("select
> > > > > > > Auth_Roles,Target_UserSid,Thread_ID,access_path,action from t4
> where
> > > > > > > action= 'e'")
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best,
> > > > > > > Jun Su
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Jun Su
> > > > >
> > > >
> > > >
> > > > --
> > > > Best,
> > > > Jun Su
> > > >
> > >
> >
> >
> > --
> > Best,
> > Jun Su
>


-- 
Best,
Jun Su


编译Flink时找不到scala-maven-plugin:3.1.4

2020-09-23 Thread Natasha
Hi All,
很高兴加入Flink这个大家庭!但是有个问题困扰了我好久!
当我导入Flink到IDEA中准备进行编译,输入“mvn clean install -Drat.skip=true 
-Dmaven.test.skip=true -Dmaven.javadoc.skip=true -Dcheckstyle.skip=true”后,
报错“Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.1.4:testCompile (scala-test-compile) on 
project flink-runtime_2.10”,
我尝试了网上的方法,修改pom.xml文件中scala-maven-plugin的版本,或者是让IDEA的Scala版本与Windows的Scala版本保持一致,但是都不起作用!




Best,
Natasha


| |
Natasha
|
|
|
签名由网易邮箱大师定制

Re: RichFunctions in Flink's Table / SQL API

2020-09-23 Thread Timo Walther

Hi Piyush,

unfortunately, UDFs have no direct access to Flink's state. Aggregate 
functions are the only type of functions that can be stateful at the 
moment. Aggregate functions store their state in an accumulator that is 
serialized/deserialized on access, but an accumulator field can be 
backed by a so-called DataView [1] which is directly backed by Flink's 
state. Maybe it is possible to leverage this functinality.


I created an issue to track this problem [2]. But of course this is not 
on the roadmap so far.


Regards,
Timo

[1] 
https://github.com/apache/flink/blob/release-1.9/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java

[2] https://issues.apache.org/jira/browse/FLINK-19371

On 22.09.20 20:28, Piyush Narang wrote:

Hi folks,

We were looking to cache some data using Flink’s MapState in one of our 
UDFs that are called by Flink SQL queries. I was trying to see if 
there’s a way to set up these state objects via the basic 
FunctionContext [1] we’re provided in the Table / SQL 
UserDefinedFunction class [2] but from what I can see it’s not possible. 
We just seem to have access to retrieve the metric group and access to 
the distributed cache / job params. Is there a way for us in Table / SQL 
UDFs to access Flink’s state and store data? Or is this something that 
isn’t supported / recommended? (If it helps we’re on Flink 1.9 and using 
the old SQL planner).


Our broader use-case is to enrich some data coming in via a Kafka stream 
by reading additional data in DynamoDB. We’d like to cache this across 
restarts to cut down on some of the DynamoDb traffic. (Ideally we’d like 
to move to temporal tables, but I think that requires a migration to 
Blink first?)


Thanks,

[1] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/FunctionContext.html


[2] - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html


-- Piyush





回复:[flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 Thread 郑斌斌
  谢谢Peidian ,我试一下
--
发件人:Peidian Li 
发送时间:2020年9月23日(星期三) 14:02
收件人:user-zh ; 郑斌斌 
主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block 
cache超用,我这边的解决办法是增大了taskmanager.memory.jvm-overhead.fraction,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。
郑斌斌  于2020年9月23日周三 下午1:33写道:
谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state, 
checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来

 java.lang.Exception: [2020-09-14 09:27:20.431]Container 
[pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is running 
36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of 4 GB 
physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing container.
 Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0 
-Djobmanager.rpc.address=njdev-nn03.nj 
-Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c 
-Djobmanager.rpc.port=30246 -Drest.address=flink-node03
  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c 
/usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798 
-XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=359703515b -D 
taskmanager.memory.network.min=359703515b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
taskmanager.memory.task.heap.size=1530082070b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0' 
-Djobmanager.rpc.address='flink-node03' 
-Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c' 
-Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
 2> 
/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
 
 [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
 [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143. 

  at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 --
 发件人:Benchao Li 
 发送时间:2020年9月23日(星期三) 13:12
 收件人:user-zh ; 郑斌斌 
 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
 郑斌斌  于2020年9月23日周三 下午12:29写道:
  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN KILL 。
  单流跑的话,比较正常。
  JOB的内存是4G。版本1.11.1
  --
  发件人:Benchao Li 
  发送时间:2020年9月23日(星期三) 10:50
  收件人:user-zh 
  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重

  Hi Tianwang,

  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加

  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
  join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
  `Math.max(leftRelativeSize, rightRelativeSize) +
  allowedLateness`,根据你的SQL,这个值应该是6h
  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
  清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
  数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
  `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
  2;`,在你的SQL来讲,就是3h,也就是说
  状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]

Flink stateful functions and Event Driven microservices

2020-09-23 Thread Mazen Ezzeddine
Hello,

What are the differences between Flink stateful functions and Event driven
microservices are they almost the same concept? Indeed I am aware that flink
stateful functions provide out of the box functionalities like Exaclty once
processing gurantees on Failure and recovery, stateful middle tier (rocksDB,
memory) with state partitioning over a cluster, but on the other hand, 
building event driven microservices using Spring boot might give better
performance (latency and real time gurantees) and better control of
scalabillity at the granularity of individual microservices etc... Any more
inights on this front ?

Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Back pressure with multiple joins

2020-09-23 Thread Dan Hill
When I use DataStream and implement the join myself, I can get 50x the
throughput.  I assume I'm doing something wrong with Flink's Table API and
SQL interface.

On Tue, Sep 22, 2020 at 11:21 PM Dan Hill  wrote:

> Hi!
>
> My goal is to better understand how my code impacts streaming throughput.
>
> I have a streaming job where I join multiple tables (A, B, C, D) using
> interval joins.
>
> Case 1) If I have 3 joins in the same query, I don't hit back pressure.
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ON...
> LEFT JOIN C
> ON...
> LEFT JOIN D
> ON...
>
>
> Case 2) If I create temporary views for two of the joins (for reuse with
> another query), I hit back a lot of back pressure.  This is selecting
> slightly more fields than the first.
>
> CREATE TEMPORARY VIEW `AB`
>
> SELECT ...
> FROM A
> LEFT JOIN B
> ...
>
> CREATE TEMPORARY VIEW `ABC`
> SELECT ...
> FROM AB
> LEFT JOIN C
> ...
>
>
>
> Can Temporary Views increase back pressure?
>
> If A, B, C and D are roughly the same size (fake data), does the join
> order matter?  E.g. I assume reducing the size of the columns in each join
> stage would help.
>
> Thanks!
> - Dan
>
>
>


Back pressure with multiple joins

2020-09-23 Thread Dan Hill
Hi!

My goal is to better understand how my code impacts streaming throughput.

I have a streaming job where I join multiple tables (A, B, C, D) using
interval joins.

Case 1) If I have 3 joins in the same query, I don't hit back pressure.

SELECT ...
FROM A
LEFT JOIN B
ON...
LEFT JOIN C
ON...
LEFT JOIN D
ON...


Case 2) If I create temporary views for two of the joins (for reuse with
another query), I hit back a lot of back pressure.  This is selecting
slightly more fields than the first.

CREATE TEMPORARY VIEW `AB`

SELECT ...
FROM A
LEFT JOIN B
...

CREATE TEMPORARY VIEW `ABC`
SELECT ...
FROM AB
LEFT JOIN C
...



Can Temporary Views increase back pressure?

If A, B, C and D are roughly the same size (fake data), does the join order
matter?  E.g. I assume reducing the size of the columns in each join stage
would help.

Thanks!
- Dan


Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 Thread Peidian Li
我也遇到过类似的问题,也是使用rocksdb,flink 1.10版本,我理解的是block cache超用,我这边的解决办法是增大了
taskmanager.memory.jvm-overhead.fraction

,如果仍然出现内存超用这个问题,可以尝试调大taskmanager.memory.task.off-heap.size

,我这边这两个参数配置分别为taskmanager.memory.jvm-overhead.fraction=0.2,taskmanager.memory.task.off-heap.size=128m

可以尝试一下。

郑斌斌  于2020年9月23日周三 下午1:33写道:

> 谢谢Benchao的回答 ,下面是我的日志,我用的是rocksdb 增量保存state,
> checkpoint数据量大时有好几个G。rocksdb使用堆外内存, 感觉好像与这块有关。但不知道用什么办法能诊断出来
>
> java.lang.Exception: [2020-09-14 09:27:20.431]Container
> [pid=10644,containerID=container_e91_1597199405327_9343_01_000298] is
> running 36864B beyond the 'PHYSICAL' memory limit. Current usage: 4.0 GB of
> 4 GB physical memory used; 6.6 GB of 8.4 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_e91_1597199405327_9343_01_000298 :
>  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>  |- 10771 10644 10644 10644 (java) 418009 60041 7044694016 1048239
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port=0
> -Djobmanager.rpc.address=njdev-nn03.nj
> -Dweb.tmpdir=/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c
> -Djobmanager.rpc.port=30246 -Drest.address=flink-node03
>  |- 10644 10642 10644 10644 (bash) 0 1 11919360 346 /bin/bash -c
> /usr/java/jdk1.8.0_181-cloudera/bin/java -Xmx1664299798 -Xms1664299798
> -XX:MaxDirectMemorySize=493921243 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
> org.apache.flink.yarn.YarnTaskExecutorRunner -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1530082070b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Dweb.port='0'
> -Djobmanager.rpc.address='flink-node03'
> -Dweb.tmpdir='/tmp/flink-web-30e7fd97-d300-4d16-869b-fae300ec3f5c'
> -Djobmanager.rpc.port='30246' -Drest.address='flink-node03' 1>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.out
> 2>
> /data03/yarn/container-logs/application_1597199405327_9343/container_e91_1597199405327_9343_01_000298/taskmanager.err
>
> [2020-09-14 09:27:20.448]Container killed on request. Exit code is 143
> [2020-09-14 09:27:20.466]Container exited with a non-zero exit code 143.
>
>  at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> --
> 发件人:Benchao Li 
> 发送时间:2020年9月23日(星期三) 13:12
> 收件人:user-zh ; 郑斌斌 
> 主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
> 超yarn内存不合理。因为state如果用的是heap,那应该是堆内内存,不会超过配置的JVM的最大heap的内存的,
> 只会jvm oom。超过yarn内存限制,那是因为还有jvm其他的overhead,加起来总量超了。
> 郑斌斌  于2020年9月23日周三 下午12:29写道:
>  我这边也是遇到同样的问题,简单的双流 interval join SQL 跑4-5天就会有发生超用内存,然后container 被YARN
> KILL 。
>  单流跑的话,比较正常。
>  JOB的内存是4G。版本1.11.1
>  --
>  发件人:Benchao Li 
>  发送时间:2020年9月23日(星期三) 10:50
>  收件人:user-zh 
>  主 题:Re: [flink-1.10.2] Blink SQL 超用内存严重
>
>  Hi Tianwang,
>
>  不知道你的DataStream是怎么实现的,只是从SQL的角度来看,有两个地方会导致状态的量会增加
>
>  1. time interval join会将watermark delay之后再发送,也就是说下游的hop窗口的状态会因为上游
>  join算子延迟了watermark发送而状态比正常情况下大一些。目前watermark的延迟是
>  `Math.max(leftRelativeSize, rightRelativeSize) +
>  allowedLateness`,根据你的SQL,这个值应该是6h
>  2. time interval join中为了防止频繁的遍历state,曾经做过一个优化。但是这个优化会导致那些过期数据的
>  清理被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
>