History Server是否可以查看TaskManager聚合后的日志

2021-05-06 Thread lhuiseu
Hi:
flink 1.12.0  
on yarn 模式
已经Finish的任务可以再history server中找到。但是通过WebUI查看TaskManager Log报404。目前Flink
History Server是不支持查看TaskManager聚合后的日志吗?希望了解history serve相关原理的同学给予帮助。
非常感谢。

 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread 张锴
  启动yarn-session之后好了,另外这里为什么需要yarn-session呢?

Rui Li  于2021年5月7日周五 上午10:40写道:

> 感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。
>
> On Fri, Apr 30, 2021 at 5:15 PM 张锴  wrote:
>
> > 我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题
> >
> > Rui Li  于2021年4月30日周五 下午4:51写道:
> >
> > > 你好,
> > >
> > > 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
> > >
> > > On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
> > >
> > > > 我这里生产的hive没有配置Kerberos认证
> > > >
> > > > 张锴  于2021年4月29日周四 上午10:05写道:
> > > >
> > > > > 官网有说吗,你在哪里找到的呢
> > > > >
> > > > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:
> > > > >
> > > > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。
> > > > >>
> > > > >>
> > > > >>
> > > > >> ---原始邮件---
> > > > >> 发件人: "张锴" > > > >> 发送时间: 2021年4月28日(周三) 上午10:41
> > > > >> 收件人: "user-zh" > > > >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常
> > > > >>
> > > > >>
> > > > >> -- Forwarded message -
> > > > >> 发件人: 张锴  > > > >> Date: 2021年4月27日周二 下午1:59
> > > > >> Subject: flink1.12.2 CLI连接hive出现异常
> > > > >> To:  > > > >>
> > > > >>
> > > > >> *使用flink1.12.2
> > > CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> > > > >> 语句时就出现异常。*
> > > > >> [ERROR] Could not execute SQL statement. Reason:
> > > > >> org.apache.hadoop.ipc.RemoteException: Application with id
> > > > >> 'application_1605840182730_29292' doesn't exist in RM. Please
> check
> > > that
> > > > >> the job submission was suc
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> > > > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> > > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> > > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > > >> at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >> at
> > > > >>
> > > > >>
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > > > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
> > > > >>
> > > > >> *使用yarn logs -applicationId application_1605840182730_29292
> > > > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> > > > >> INFO client.RMProxy: Connecting to ResourceManager at
> > > > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> > > > >> Unable to get ApplicationState. Attempting to fetch logs directly
> > from
> > > > the
> > > > >> filesystem.
> > > > >> Can not find the appOwner. Please specify the correct appOwner
> > > > >> Could not locate application logs for
> > application_1605840182730_29292
> > > > >>
> > > > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread 张锴
启动yarn-session之后好了,另外这里为什么需要yarn-session呢?其他的不行吗?

guoyb <861277...@qq.com> 于2021年5月7日周五 上午10:59写道:

> 看看yarn session是不是被kill掉了。
>
>
>
> ---原始邮件---
> 发件人: "Rui Li" 发送时间: 2021年5月7日(周五) 上午10:39
> 收件人: "user-zh" 主题: Re: Fwd: flink1.12.2 CLI连接hive出现异常
>
>
> 感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。
>
> On Fri, Apr 30, 2021 at 5:15 PM 张锴 
>  我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题
> 
>  Rui Li  
>   你好,
>  
>   看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
>  
>   On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
>  
>我这里生产的hive没有配置Kerberos认证
>   
>张锴
> 官网有说吗,你在哪里找到的呢
>
> guoyb <861277...@qq.com 于2021年4月28日周三 上午10:56写道:
>
> 我的也有这种问题,没解决,kerberos认证的hive导致的。
>
>
>
> ---原始邮件---
> 发件人: "张锴" 发送时间: 2021年4月28日(周三) 上午10:41
> 收件人: "user-zh" 主题: Fwd: flink1.12.2 CLI连接hive出现异常
>
>
> -- Forwarded message -
> 发件人: 张锴  Date: 2021年4月27日周二 下午1:59
> Subject: flink1.12.2 CLI连接hive出现异常
> To: 
>
> *使用flink1.12.2
>   CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> 语句时就出现异常。*
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.hadoop.ipc.RemoteException: Application
> with id
> 'application_1605840182730_29292' doesn't exist in
> RM. Please check
>   that
> the job submission was suc
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> at
>
>
>   
>  
> 
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> at
>
>
>   
>  
> 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> at
> org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> at
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> at
> org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> at
> java.security.AccessController.doPrivileged(Native Method)
> at
> javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
>
>   
>  
> 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
>
> *使用yarn logs -applicationIdnbsp;
> application_1605840182730_29292
> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> INFO client.RMProxy: Connecting to ResourceManager
> at
> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> Unable to get ApplicationState. Attempting to
> fetch logs directly
>  from
>the
> filesystem.
> Can not find the appOwner. Please specify the
> correct appOwner
> Could not locate application logs for
>  application_1605840182730_29292
>
> 这个如何排查呢,有遇到类似的问题的小伙伴吗
>
>
>   
>  
>  
>   --
>   Best regards!
>   Rui Li
>  
> 
>
>
> --
> Best regards!
> Rui Li


Qustion about Flink Upsert Dynamic Kafka Table unlimited expansion

2021-05-06 Thread vtygoss
Hi Community,


Recently i am working on building realtime data warehouse at medical field. 
Using Flink and Upsert-Kafka Dynamic Table, but the historical data must not be 
expired and the changelog stream in kafka is unlimited expanding, i have met a 
problem with unlimited expanding data scale.


How to solve this problem? Any suggestions, thanks for your reply!

Re: Question regarding cpu limit config in Flink standalone mode

2021-05-06 Thread Xintong Song
Hi Fan,

For a java application, you cannot specify how many cpu a process should
use. The JVM process will always try to use as much cpu time as it needs.
The limitation can only come from external: hardware limit, OS scheduling,
cgroups, etc.

On Kubernetes, it is the pod's resource specifications that decide how many
cpu resources a Flink JM/TM can use.
- For the standalone kubernetes deployment, you can specify the pods'
resources in your yaml files.
- For the native kubernetes deployment, TM pods are requested by Flink's
ResourceManager. Thus, the configuration option `kubernets.taskmanager.cpu`
controls the cpu resource of pods Flink requests from Kubernetes.

Thank you~

Xintong Song



On Fri, May 7, 2021 at 10:35 AM Fan Xie  wrote:

> Hi Flink Community,
>
> Recently I am working on an auto-scaling project that needs to dynamically
> adjust the cpu config of Flink standalone jobs . Our jobs will be running
> on *standalone* mode in a k8s cluster. After going through the
> configuration doc:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/,
> I can't find a config that can directly control the cpu of a standalone
> flink job. I can only see *kubernetes.taskmanager.cpu*, but looks like
> this config is only useful in native k8s mode. I also notice another
> config: *taskmanager.numberOfTaskSlots* that can control the cpu config
> in an indirect way. Is there any reason why we can't config the cpu for a
> standalone job directly?
>
> Thanks for answering my question.
>
> Best,
> Fan
>
>


自定义数据源输出JDBC Sink报错

2021-05-06 Thread TonyChen
FLink版本为1.12.0,自定义了一个mqtt的数据源,在flink 
sql中可以获取数据,通过connector='print'打印出来。但是输出换成JDBC就报错了,一条数据也没有输出到数据库中。

把source换成datagen测试,就可以写入到jdbc中。所以,是mqtt数据源的问题吗?还是因为mqtt数据太大了,数据库形成了反压?可是个人理解,即便是反压,也应该可以写入几条到数据库中。

每次报错都停留在
https://github.com/apache/flink/blob/release-1.12.0/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/AbstractJdbcRowConverter.java#L220
 


每次都报错在这里,说statement为null

已经在SQL中对null数据做了处理。
INSERT INTO ods_iot_message
SELECT IF(`tag` is null, '', `tag`),
   IF(`value` is null, '', `value`),
   IF(`quality` is null, 0, `quality`),
   IF(`topic` is null, '', `topic`),
   `ts`
FROM source2;


完整异常如下:
2021-05-07 08:58:18,540 ERROR 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction  - 
+I(nxseq1474,0.644,0,nxs48,2021-05-07T09:00:39.064)
2021-05-07 08:58:18,540 ERROR 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction  - Could not 
forward element to next operator
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
  at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
  at 
com.boteratech.kunpeng.dlink.connector.mqtt.MqttSourceFunction$1.messageArrived(MqttSourceFunction.java:130)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.deliverMessage(CommsCallback.java:519)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:417)
  at 
org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:214)
  at java.lang.Thread.run(Thread.java:823)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at 
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 11 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:88)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
  at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
  at StreamExecCalc$18.processElement(Unknown Source)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 17 more
Caused by: java.io.IOException: Writing records to JDBC failed.
  at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:159)
  at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
  at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
  at 
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72)
  at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  ... 23 more
Caused by: java.lang.NullPointerException
  at 

回复:Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread guoyb
看看yarn session是不是被kill掉了。



---原始邮件---
发件人: "Rui Li"

Re: Fwd: flink1.12.2 CLI连接hive出现异常

2021-05-06 Thread Rui Li
感觉是提交job时遇到了问题。可以看看本地SQL client的日志有没有更详细的信息。另外可以试试用yarn session模式提交。

On Fri, Apr 30, 2021 at 5:15 PM 张锴  wrote:

> 我没有指定,就是执行了一条查询语句,自己生成的,但是好像没彻底成功,所以日志没说哪里的问题
>
> Rui Li  于2021年4月30日周五 下午4:51写道:
>
> > 你好,
> >
> > 看错误信息是找不到application_1605840182730_29292,这个是你提交任务的时候指定的么?
> >
> > On Thu, Apr 29, 2021 at 1:35 PM 张锴  wrote:
> >
> > > 我这里生产的hive没有配置Kerberos认证
> > >
> > > 张锴  于2021年4月29日周四 上午10:05写道:
> > >
> > > > 官网有说吗,你在哪里找到的呢
> > > >
> > > > guoyb <861277...@qq.com> 于2021年4月28日周三 上午10:56写道:
> > > >
> > > >> 我的也有这种问题,没解决,kerberos认证的hive导致的。
> > > >>
> > > >>
> > > >>
> > > >> ---原始邮件---
> > > >> 发件人: "张锴" > > >> 发送时间: 2021年4月28日(周三) 上午10:41
> > > >> 收件人: "user-zh" > > >> 主题: Fwd: flink1.12.2 CLI连接hive出现异常
> > > >>
> > > >>
> > > >> -- Forwarded message -
> > > >> 发件人: 张锴  > > >> Date: 2021年4月27日周二 下午1:59
> > > >> Subject: flink1.12.2 CLI连接hive出现异常
> > > >> To:  > > >>
> > > >>
> > > >> *使用flink1.12.2
> > CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select
> > > >> 语句时就出现异常。*
> > > >> [ERROR] Could not execute SQL statement. Reason:
> > > >> org.apache.hadoop.ipc.RemoteException: Application with id
> > > >> 'application_1605840182730_29292' doesn't exist in RM. Please check
> > that
> > > >> the job submission was suc
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:382)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:561)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
> > > >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
> > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
> > > >> at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
> > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > >> at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > > >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)
> > > >>
> > > >> *使用yarn logs -applicationId application_1605840182730_29292
> > > >> 查看日志时,并没有给出具体的错误,以下是打出的日志。日志几乎看不出啥问题。*
> > > >> INFO client.RMProxy: Connecting to ResourceManager at
> > > >> hadoop01.xxx.xxx.xxx/xx.xx.x.xx:8050
> > > >> Unable to get ApplicationState. Attempting to fetch logs directly
> from
> > > the
> > > >> filesystem.
> > > >> Can not find the appOwner. Please specify the correct appOwner
> > > >> Could not locate application logs for
> application_1605840182730_29292
> > > >>
> > > >> 这个如何排查呢,有遇到类似的问题的小伙伴吗
> > > >
> > > >
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: Upsert kafka 作为 source 的几个问题

2021-05-06 Thread Shengkai Fang
Hi.

1. 当初的设计是一个较为保守的设计,其主要目的就是为了能够补全delete消息;
2. 核心类是 StreamExecChangelogNormalize[1]
3. 是的。目前 Upsert-kafka 要求具有相同key的数据在相同 partition 的。因为 kafka 仅保证 partiiton 内按
offset 读取,如果相同 key 的数据分布在不同 partition 的话,那么读取会乱序。
4. 当数据进入到具体的算子的时候并不会区别数据是来自什么connector的。如果 left, right 的 paritition
策略不一致,会shuffle的。

请问具体是有什么需求吗? 另外能再说说 ksql 具体的限制吗?

Best,
Shengkai


[1]
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java

macdoor  于2021年5月6日周四 上午9:48写道:

> 我也想知道 flink 在对 kafka 消息进行 join 时,是否对按主键分区有要求,KSQL有强制性的要求
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


请问ColumnVector类为何没map类型的实现类

2021-05-06 Thread Jun Zhang
大家好:
请问flink中org.apache.flink.table.data.vector.ColumnVector这个类的子类为什么没map类型的实现类呢?是什么原因呢?谢谢


Question regarding cpu limit config in Flink standalone mode

2021-05-06 Thread Fan Xie
Hi Flink Community,

Recently I am working on an auto-scaling project that needs to dynamically 
adjust the cpu config of Flink standalone jobs . Our jobs will be running on 
standalone mode in a k8s cluster. After going through the configuration doc: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/,
 I can't find a config that can directly control the cpu of a standalone flink 
job. I can only see kubernetes.taskmanager.cpu, but looks like this config is 
only useful in native k8s mode. I also notice another config: 
taskmanager.numberOfTaskSlots that can control the cpu config in an indirect 
way. Is there any reason why we can't config the cpu for a standalone job 
directly?

Thanks for answering my question.

Best,
Fan



Re: Table name for table created fromDataStream

2021-05-06 Thread Leonard Xu
Hi, tbud

You can register the Table API object as a temporary view and then run query on 
it:

tableEnv.createTemporaryView(“MyTable”, eventsTable);
tableEnv.executeSql(“SELECT * FROM MyTable“).print();

Best,
Leonard

> 在 2021年5月7日,03:17,tbud  写道:
> 
> Does anybody know how to set the name for the table created using
> fromDataStream() method ? Flink's documentation doesn't mention anything
> about this and when I went through the taskManager logs I saw some auto
> generated name like 'Unregistered_DataStream_5'.
> Here's my code :
> /StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>Table eventsTable =
>tableEnv.fromDataStream(
>eventStream,
>$("id"),
>$("orgId"));/
> Now if I want to run some sql query on this, using tableEnv.sqlQuery() where
> the SQL is the rule that I want to run on the events, so the SQL is read
> from the external source, I would need the table name for this table to be
> fixed so that the query writers can use that.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



The problem of getting data by Rest API

2021-05-06 Thread penguin.
On the Web UI page, we can see that the relevant data is updated every 3S, such 
as the read-bytes of each operator.
But when I get data through Rest API, the data is updated every 6 seconds or 
even more than 10 seconds. Why?
The related data of read bytes obtained through Rest API is as follows:


19:02:04  0  0  0  0
  0  0  0
19:02:07  0  0  0  0
  0  0  0
19:02:10  0  0  0  0
  0  0  0
19:02:13  0  0  0  0
  0  0  0


19:02:16 792820 807792 684683 796474
 813133 680141 689590
19:02:19 792820 807792 684683 796474
 813133 680141 689590
19:02:22 792820 807792 684683 796474
 813133 680141 689590
19:02:26 792820 807792 684683 796474
 813133 680141 689590


19:02:291817569181589915607441808234
183674115593411568711
19:02:321817569181589915607441808234
183674115593411568711
19:02:351817569181589915607441808234
183674115593411568711
19:02:381817569181589915607441808234
183674115593411568711


19:02:412796433280597324019272797311
283816823975422417726
19:02:452796433280597324019272797311
283816823975422417726
19:02:482796433280597324019272797311
283816823975422417726


19:02:513649398364005031305833624854
368788931223333144306
19:02:543649398364005031305833624854
368788931223333144306
19:02:573649398364005031305833624854
368788931223333144306
19:03:003649398364005031305833624854
368788931223333144306


19:03:034529443452421838815094517926
458277038843893907298
19:03:064529443452421838815094517926
458277038843893907298
19:03:094529443452421838815094517926
458277038843893907298


19:03:125432212542356646597375404419
549278646494694685430
19:03:165432212542356646597375404419
549278646494694685430
19:03:195432212542356646597375404419
549278646494694685430
19:03:225432212542356646597375404419
549278646494694685430


19:03:256255327624382353675166236577
632195853604965396446
19:03:286255327624382353675166236577
632195853604965396446
19:03:316255327624382353675166236577
632195853604965396446


19:03:347207042720372561848047178875
728620761723936212499
19:03:377207042720372561848047178875
728620761723936212499
19:03:407207042720372561848047178875
728620761723936212499
19:03:437207042720372561848047178875
728620761723936212499


19:03:478064759804823769187158034269
814411969127016957967
19:03:508064759804823769187158034269
814411969127016957967
19:03:538064759804823769187158034269
814411969127016957967


19:03:568903005889536476367968880999
900417176285157684865
19:03:598903005889536476367968880999
900417176285157684865
19:04:028903005889536476367968880999
900417176285157684865


19:04:059764407

flink页面监控问题

2021-05-06 Thread penguin.
再web ui页面上,可以看到相关数据大概3s个更新一次,比如每个算子的read-bytes。
但是自己通过api去获取数据的时候,这些数据却是大概6秒甚至10多秒更新一次,请问这是为什么呢?
通过api获取的read-bytes数据如下:
19:02:04  0  0  0  0
  0  0  0
19:02:07  0  0  0  0
  0  0  0
19:02:10  0  0  0  0
  0  0  0
19:02:13  0  0  0  0
  0  0  0


19:02:16 792820 807792 684683 796474
 813133 680141 689590
19:02:19 792820 807792 684683 796474
 813133 680141 689590
19:02:22 792820 807792 684683 796474
 813133 680141 689590
19:02:26 792820 807792 684683 796474
 813133 680141 689590


19:02:291817569181589915607441808234
183674115593411568711
19:02:321817569181589915607441808234
183674115593411568711
19:02:351817569181589915607441808234
183674115593411568711
19:02:381817569181589915607441808234
183674115593411568711


19:02:412796433280597324019272797311
283816823975422417726
19:02:452796433280597324019272797311
283816823975422417726
19:02:482796433280597324019272797311
283816823975422417726


19:02:513649398364005031305833624854
368788931223333144306
19:02:543649398364005031305833624854
368788931223333144306
19:02:573649398364005031305833624854
368788931223333144306
19:03:003649398364005031305833624854
368788931223333144306


19:03:034529443452421838815094517926
458277038843893907298
19:03:064529443452421838815094517926
458277038843893907298
19:03:094529443452421838815094517926
458277038843893907298


19:03:125432212542356646597375404419
549278646494694685430
19:03:165432212542356646597375404419
549278646494694685430
19:03:195432212542356646597375404419
549278646494694685430
19:03:225432212542356646597375404419
549278646494694685430


19:03:256255327624382353675166236577
632195853604965396446
19:03:286255327624382353675166236577
632195853604965396446
19:03:316255327624382353675166236577
632195853604965396446


19:03:347207042720372561848047178875
728620761723936212499
19:03:377207042720372561848047178875
728620761723936212499
19:03:407207042720372561848047178875
728620761723936212499
19:03:437207042720372561848047178875
728620761723936212499


19:03:478064759804823769187158034269
814411969127016957967
19:03:508064759804823769187158034269
814411969127016957967
19:03:538064759804823769187158034269
814411969127016957967


19:03:568903005889536476367968880999
900417176285157684865
19:03:598903005889536476367968880999
900417176285157684865
19:04:028903005889536476367968880999
900417176285157684865


19:04:059764407974649083756159721560
986487683642668420637
19:04:089764407974649083756159721560
9864876

Table name for table created fromDataStream

2021-05-06 Thread tbud
Does anybody know how to set the name for the table created using
fromDataStream() method ? Flink's documentation doesn't mention anything
about this and when I went through the taskManager logs I saw some auto
generated name like 'Unregistered_DataStream_5'.
Here's my code :
/StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table eventsTable =
tableEnv.fromDataStream(
eventStream,
$("id"),
$("orgId"));/
Now if I want to run some sql query on this, using tableEnv.sqlQuery() where
the SQL is the rule that I want to run on the events, so the SQL is read
from the external source, I would need the table name for this table to be
fixed so that the query writers can use that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I am able to maintain a list state in process function and aggregate the
values, how do i get a notification/event to remove the value from the
stored list state.

On Thu, May 6, 2021 at 8:47 PM Swagat Mishra  wrote:

> I meant "Do you recommend the state to be maintained in* Value** State *or
> external store like elastic?"
>
> On Thu, May 6, 2021 at 8:46 PM Swagat Mishra  wrote:
>
>> I want to aggregate the user activity e.g number of products the user has
>> purchased in the last 1 hour.
>>
>> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
>> product at 10:45 AM and another product at 1:30 AM.
>>
>> My API should give 2 products purchased if the API call happens at 11:29
>> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>>
>> The API will access data persisted from the flink streaming output.
>>
>> As of now I am doing keyby on (ID = USER-A) .
>>
>> Do I have to maintain my own own calculated state within the process
>> window function. Is the process window function shared across all keys or
>> one instance per key.  Do you recommend the state to be maintained in State
>> or elastic?
>>
>> Also, if I change the processing to processing time instead of event
>> time, the aggregation is happening. Any reason why flink could not provide
>> event time aggregations like the processing time aggregation.
>>
>>
>>
>> On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:
>>
>>> I'm not sure what you want to achieve exactly.
>>>
>>> You can always keyby the values by a constant pseudo-key such that all
>>> values will be in the same partition (so instead of using global but with
>>> the same effect). Then you can use a process function to maintain the
>>> state. Just make sure that your data volume is low enough as this part is
>>> not parallelizable by definition.
>>>
>>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra 
>>> wrote:
>>>
 thank you

 i wil have a look at datasteeam.global

 is there any other way to maintain state like by using valuestate.


 On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:

> If you keyby then all direct functions see only the elements with the
> same key. So that's the expected behavior and the base of Flink's parallel
> processing capabilities.
>
> If you want to generate a window over all customers, you have to use a
> global window. However, that also means that no parallelization can 
> happen,
> so I'd discourage that.
>
> A better way would be to perform as many calculations as possible in
> the process function (for example create a customer with buy information
> record) and then have a DataStream#global() reshuffle to collect all
> aggregated information on one node.
>
> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra 
> wrote:
>
>> Thank you.
>>
>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>
>> Adding this to the source context worked.
>>
>> However I am still getting only one customer in the process method. i 
>> would expect the iterable to provide all customers in the window. or do 
>> i have to maintain state.
>>
>>
>> changes for reference:
>>
>> I made the following change, also removed anly lag that i had introduced 
>> for experimentation earlier.
>>
>> so trigger looks like:
>>
>>
>> @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>> // if the watermark is already past the window fire 
>> immediately
>> return TriggerResult.FIRE;
>> } else {
>> //LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>> 
>> triggerContext.registerEventTimeTimer(customer.getEventTime());  
>>   return TriggerResult.FIRE;
>>
>> }
>> }
>>
>> @Override
>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>> return time == timeWindow.maxTimestamp() ?
>> TriggerResult.FIRE :
>> TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>> TriggerContext ctx) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public void clear(TimeWindow window, TriggerContext ctx) throws 
>> Exception {
>> ctx.deleteEventTimeTimer(window.maxTimestamp());
>> }
>>
>> @Override
>> public boolean canMerge() {
>> return true;
>> }

Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
Thank you for answering all my questions. My suggestion would be to start off
with exposing an API to allow dynamically changing operator parallelism as
the users of flink will be better able to decide the right scaling policy.
Once this functionality is there, its just a matter of providing policies
(ratio based, throughput based, back-pressure based). The web UI could be
used for setting parallelism as well. 

An analogy would be autoscaling provided by cloud providers. The features
provided are:
1. Web UI for manually overriding parallelism (min, max, desired)
2. Metric based scaling policies

It will be difficult for developers to think of a reasonable value for
maxParallelism for each operator and like I explained above, sometimes even
a small increase in parallelism is enough to bring things down. A UI /
external policy based approach will allow for quick experimentation and fine
tuning. I don't think it will be possible for flink developers to build one
size fits all solution.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Unsubscribe

2021-05-06 Thread Dan Pettersson
I've also tried a few times now the last couple of months. I think it would
be very nice if the "flink admin" could look into this, instead of us
reaching out to the Apache Infrastructure team.

Thanks,

/Dan

Den tors 6 maj 2021 kl 13:31 skrev Chesnay Schepler :

> Could you reach out to the Apache Infrastructure team
>  about not being able to
> unsubscribe? Maybe this functionality is currently broken.
>
> On 5/6/2021 12:48 PM, Andrew Kramer wrote:
>
> I have been unable to unsubscribe as well. Have tried emailing just like
> you
>
> On Thu, May 6, 2021 at 3:33 AM Xander Song  wrote:
>
>> How can I unsubscribe from the Apache Flink user mailing list? I have
>> tried emailing user-unsubscr...@flink.apache.org, but am still receiving
>> messages.
>>
>> Thank you.
>>
>
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
We do exactly what you mentioned. However, it's not that simple
unfortunately. Our services don't have a predictable performance as traffic
varies a lot during the day. 

As I've explained above increase source parallelism to 2 was enough to tip
over our services and reducing parallelism of the async operator to 2 is not
an option. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
I am using the async IO operator. The problem is that increasing source
parallelism from 1 to 2 was enough to tip our systems over the edge.
Reducing the parallelism of async IO operator to 2 is not an option as that
would reduce the throughput quite a bit. This means that no matter what we
do, we'll end up with different operators with different parallelism. 

What I meant with: "running all operators at such a high scale would result
in wastage of resources, even with operator chaining in place." was that
creating as many subtasks as that of the windowing operator for each of my
operators would lead to sub-optimal performance. While chaining would ensure
that all tasks would run in one slot, the partitioning of data would result
in the same network IO as chaining doesn't guarantee that the same tuple is
processed in 1 slot. 

In my experience, running operators with same parallelism of each operator
is always inferior compared to hand tuned parallelism.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I meant "Do you recommend the state to be maintained in* Value** State *or
external store like elastic?"

On Thu, May 6, 2021 at 8:46 PM Swagat Mishra  wrote:

> I want to aggregate the user activity e.g number of products the user has
> purchased in the last 1 hour.
>
> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
> product at 10:45 AM and another product at 1:30 AM.
>
> My API should give 2 products purchased if the API call happens at 11:29
> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>
> The API will access data persisted from the flink streaming output.
>
> As of now I am doing keyby on (ID = USER-A) .
>
> Do I have to maintain my own own calculated state within the process
> window function. Is the process window function shared across all keys or
> one instance per key.  Do you recommend the state to be maintained in State
> or elastic?
>
> Also, if I change the processing to processing time instead of event time,
> the aggregation is happening. Any reason why flink could not provide event
> time aggregations like the processing time aggregation.
>
>
>
> On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:
>
>> I'm not sure what you want to achieve exactly.
>>
>> You can always keyby the values by a constant pseudo-key such that all
>> values will be in the same partition (so instead of using global but with
>> the same effect). Then you can use a process function to maintain the
>> state. Just make sure that your data volume is low enough as this part is
>> not parallelizable by definition.
>>
>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra  wrote:
>>
>>> thank you
>>>
>>> i wil have a look at datasteeam.global
>>>
>>> is there any other way to maintain state like by using valuestate.
>>>
>>>
>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>>>
 If you keyby then all direct functions see only the elements with the
 same key. So that's the expected behavior and the base of Flink's parallel
 processing capabilities.

 If you want to generate a window over all customers, you have to use a
 global window. However, that also means that no parallelization can happen,
 so I'd discourage that.

 A better way would be to perform as many calculations as possible in
 the process function (for example create a customer with buy information
 record) and then have a DataStream#global() reshuffle to collect all
 aggregated information on one node.

 On Thu, May 6, 2021 at 9:20 AM Swagat Mishra 
 wrote:

> Thank you.
>
> sourceContext.collectWithTimestamp(c, c.getEventTime());
>
> Adding this to the source context worked.
>
> However I am still getting only one customer in the process method. i 
> would expect the iterable to provide all customers in the window. or do i 
> have to maintain state.
>
>
> changes for reference:
>
> I made the following change, also removed anly lag that i had introduced 
> for experimentation earlier.
>
> so trigger looks like:
>
>
> @Override
> public TriggerResult onElement(Customer customer, long l, TimeWindow 
> timeWindow, TriggerContext triggerContext) throws Exception {
> if (timeWindow.maxTimestamp() <= 
> triggerContext.getCurrentWatermark()) {
> // if the watermark is already past the window fire 
> immediately
> return TriggerResult.FIRE;
> } else {
> //LOGGER.info("Max timestamp for customer: " + 
> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
> 
> triggerContext.registerEventTimeTimer(customer.getEventTime());   
>  return TriggerResult.FIRE;
>
> }
> }
>
> @Override
> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
> TriggerContext triggerContext) {
> return time == timeWindow.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(TimeWindow window, TriggerContext ctx) throws 
> Exception {
> ctx.deleteEventTimeTimer(window.maxTimestamp());
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> and *removed latenness*
>
> customerStream
>
> 
> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
> .keyBy((KeySelector) Customer::getIdentifier)
> 

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I want to aggregate the user activity e.g number of products the user has
purchased in the last 1 hour.

so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
product at 10:45 AM and another product at 1:30 AM.

My API should give 2 products purchased if the API call happens at 11:29 AM
(10:30 , 10:45) and 1 product if the API call happens at 1:45 AM

The API will access data persisted from the flink streaming output.

As of now I am doing keyby on (ID = USER-A) .

Do I have to maintain my own own calculated state within the process window
function. Is the process window function shared across all keys or one
instance per key.  Do you recommend the state to be maintained in State or
elastic?

Also, if I change the processing to processing time instead of event time,
the aggregation is happening. Any reason why flink could not provide event
time aggregations like the processing time aggregation.



On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:

> I'm not sure what you want to achieve exactly.
>
> You can always keyby the values by a constant pseudo-key such that all
> values will be in the same partition (so instead of using global but with
> the same effect). Then you can use a process function to maintain the
> state. Just make sure that your data volume is low enough as this part is
> not parallelizable by definition.
>
> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra  wrote:
>
>> thank you
>>
>> i wil have a look at datasteeam.global
>>
>> is there any other way to maintain state like by using valuestate.
>>
>>
>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>>
>>> If you keyby then all direct functions see only the elements with the
>>> same key. So that's the expected behavior and the base of Flink's parallel
>>> processing capabilities.
>>>
>>> If you want to generate a window over all customers, you have to use a
>>> global window. However, that also means that no parallelization can happen,
>>> so I'd discourage that.
>>>
>>> A better way would be to perform as many calculations as possible in the
>>> process function (for example create a customer with buy information
>>> record) and then have a DataStream#global() reshuffle to collect all
>>> aggregated information on one node.
>>>
>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:
>>>
 Thank you.

 sourceContext.collectWithTimestamp(c, c.getEventTime());

 Adding this to the source context worked.

 However I am still getting only one customer in the process method. i 
 would expect the iterable to provide all customers in the window. or do i 
 have to maintain state.


 changes for reference:

 I made the following change, also removed anly lag that i had introduced 
 for experimentation earlier.

 so trigger looks like:


 @Override
 public TriggerResult onElement(Customer customer, long l, TimeWindow 
 timeWindow, TriggerContext triggerContext) throws Exception {
 if (timeWindow.maxTimestamp() <= 
 triggerContext.getCurrentWatermark()) {
 // if the watermark is already past the window fire immediately
 return TriggerResult.FIRE;
 } else {
 //LOGGER.info("Max timestamp for customer: " + 
 customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
 
 triggerContext.registerEventTimeTimer(customer.getEventTime());
 return TriggerResult.FIRE;

 }
 }

 @Override
 public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
 TriggerContext triggerContext) {
 return time == timeWindow.maxTimestamp() ?
 TriggerResult.FIRE :
 TriggerResult.CONTINUE;
 }

 @Override
 public TriggerResult onProcessingTime(long time, TimeWindow window, 
 TriggerContext ctx) throws Exception {
 return TriggerResult.CONTINUE;
 }

 @Override
 public void clear(TimeWindow window, TriggerContext ctx) throws 
 Exception {
 ctx.deleteEventTimeTimer(window.maxTimestamp());
 }

 @Override
 public boolean canMerge() {
 return true;
 }

 and *removed latenness*

 customerStream

 
 //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
 .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
 .keyBy((KeySelector) Customer::getIdentifier)
 .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
 //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
 .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
 .process(new CustomAggregateFunction());


 On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread Till Rohrmann
Yes, exposing an API to adjust the parallelism of individual operators is
definitely a good step towards the auto-scaling feature which we will
consider. The missing piece is persisting this information so that in case
of recovery you don't recover with a completely different parallelism.

I also agree that it will be very hard for Flink to decide on the best
parallelism in general. For that to do you usually need to know a bit about
the application logic. Hence, outsourcing this problem to the user who can
do better decisions is a very good idea.

The community will keep improving this feature so that with next releases
it should become more powerful.

Cheers,
Till

On Thu, May 6, 2021 at 2:38 PM vishalovercome  wrote:

> Thank you for answering all my questions. My suggestion would be to start
> off
> with exposing an API to allow dynamically changing operator parallelism as
> the users of flink will be better able to decide the right scaling policy.
> Once this functionality is there, its just a matter of providing policies
> (ratio based, throughput based, back-pressure based). The web UI could be
> used for setting parallelism as well.
>
> An analogy would be autoscaling provided by cloud providers. The features
> provided are:
> 1. Web UI for manually overriding parallelism (min, max, desired)
> 2. Metric based scaling policies
>
> It will be difficult for developers to think of a reasonable value for
> maxParallelism for each operator and like I explained above, sometimes even
> a small increase in parallelism is enough to bring things down. A UI /
> external policy based approach will allow for quick experimentation and
> fine
> tuning. I don't think it will be possible for flink developers to build one
> size fits all solution.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Interacting with flink-jobmanager via CLI in separate pod

2021-05-06 Thread Robert Cullen
I resolved this by changing the jobmanager-rest-service.yaml (Changed type
to ClusterIP and removed nodePort

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: ClusterIP
  ports:
  - name: rest
port: 8081
targetPort: 8081
#nodePort: 30081
  selector:
app: flink
component: jobmanager


On Wed, May 5, 2021 at 10:28 PM Yang Wang  wrote:

> It seems that you are using the NodePort to expose the rest service. If
> you only want to access the Flink UI/rest in the K8s cluster,
> then I would suggest to set "kubernetes.rest-service.exposed.type" to
> "ClusterIP". Because we are using the K8s master node to
> construct the JobManager rest endpoint when using NodePort. Sometime, it
> is not accessible due to firewall.
>
> Best,
> Yang
>
> Robert Metzger  于2021年5月6日周四 上午2:08写道:
>
>> Okay, it appears to have resolved 10.43.0.1:30081 as the address of the
>> JobManager. Most likely, the container can not access this address. Can you
>> validate this from within the container?
>>
>> If I understand the Flink documentation correctly, you should be able to
>> manually specify rest.address, rest.port for the JobManager address. If
>> you can manually figure out an address to the JobManager service, and pass
>> it to Flink, the submission should work.
>>
>> On Wed, May 5, 2021 at 7:15 PM Robert Cullen 
>> wrote:
>>
>>> Thanks for the reply. Here is an updated exception with DEBUG on. It
>>> appears to be timing out:
>>>
>>> 2021-05-05 16:56:19,700 DEBUG 
>>> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
>>> namespace of Kubernetes client to cmdaa
>>> 2021-05-05 16:56:19,700 DEBUG 
>>> org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory [] - Setting 
>>> max concurrent requests of Kubernetes client to 64
>>> 2021-05-05 16:56:20,176 INFO  
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
>>> flink cluster flink-jobmanager successfully, JobManager Web Interface: 
>>> http://10.43.0.1:30081
>>> 2021-05-05 16:56:20,239 INFO  org.apache.flink.client.cli.CliFrontend   
>>>[] - Waiting for response...
>>> 2021-05-05 17:02:09,605 ERROR org.apache.flink.client.cli.CliFrontend   
>>>[] - Error while running the command.
>>> org.apache.flink.util.FlinkException: Failed to retrieve job list.
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:449) 
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:430) 
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:427) 
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) 
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>>  [flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
>>> [flink-dist_2.12-1.13.0.jar:1.13.0]
>>> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>>> Could not complete the operation. Number of retries has been exhausted.
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>>>  ~[?:1.8.0_292]
>>> at 
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>>>  ~[?:1.8.0_292]
>>> at 
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>>>  ~[?:1.8.0_292]
>>> at 
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>>>  ~[?:1.8.0_292]
>>> at 
>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
>>>  ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> at 
>>> 

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
I'm not sure what you want to achieve exactly.

You can always keyby the values by a constant pseudo-key such that all
values will be in the same partition (so instead of using global but with
the same effect). Then you can use a process function to maintain the
state. Just make sure that your data volume is low enough as this part is
not parallelizable by definition.

On Thu, May 6, 2021 at 10:09 AM Swagat Mishra  wrote:

> thank you
>
> i wil have a look at datasteeam.global
>
> is there any other way to maintain state like by using valuestate.
>
>
> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>
>> If you keyby then all direct functions see only the elements with the
>> same key. So that's the expected behavior and the base of Flink's parallel
>> processing capabilities.
>>
>> If you want to generate a window over all customers, you have to use a
>> global window. However, that also means that no parallelization can happen,
>> so I'd discourage that.
>>
>> A better way would be to perform as many calculations as possible in the
>> process function (for example create a customer with buy information
>> record) and then have a DataStream#global() reshuffle to collect all
>> aggregated information on one node.
>>
>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:
>>
>>> Thank you.
>>>
>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>
>>> Adding this to the source context worked.
>>>
>>> However I am still getting only one customer in the process method. i would 
>>> expect the iterable to provide all customers in the window. or do i have to 
>>> maintain state.
>>>
>>>
>>> changes for reference:
>>>
>>> I made the following change, also removed anly lag that i had introduced 
>>> for experimentation earlier.
>>>
>>> so trigger looks like:
>>>
>>>
>>> @Override
>>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> if (timeWindow.maxTimestamp() <= 
>>> triggerContext.getCurrentWatermark()) {
>>> // if the watermark is already past the window fire immediately
>>> return TriggerResult.FIRE;
>>> } else {
>>> //LOGGER.info("Max timestamp for customer: " + 
>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>> triggerContext.registerEventTimeTimer(customer.getEventTime()); 
>>>return TriggerResult.FIRE;
>>>
>>> }
>>> }
>>>
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>> TriggerContext triggerContext) {
>>> return time == timeWindow.maxTimestamp() ?
>>> TriggerResult.FIRE :
>>> TriggerResult.CONTINUE;
>>> }
>>>
>>> @Override
>>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>>> TriggerContext ctx) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>>
>>> @Override
>>> public void clear(TimeWindow window, TriggerContext ctx) throws 
>>> Exception {
>>> ctx.deleteEventTimeTimer(window.maxTimestamp());
>>> }
>>>
>>> @Override
>>> public boolean canMerge() {
>>> return true;
>>> }
>>>
>>> and *removed latenness*
>>>
>>> customerStream
>>>
>>> 
>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>> .keyBy((KeySelector) Customer::getIdentifier)
>>> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>> .process(new CustomAggregateFunction());
>>>
>>>
>>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:
>>>
 Your source is not setting the timestamp with collectWithTimestamp.
 I'm assuming that nothing really moves from the event time's perspective.

 On Thu, May 6, 2021 at 8:58 AM Swagat Mishra 
 wrote:

> Yes customer generator is setting the event timestamp correctly like I
> see below. I debugged and found that the events are getting late, so never
> executed. i.e,. in the window operator the method  this.isWindowLate(
> actualWindow) is getting executed to false for the rest of the events
> except the first, hence the events are getting skipped, not able to figure
> out where exactly the issue is.
>
> i have removed evictot=r because I don't think I need it yet.
>
> stream looks like
>
> customerStream
> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
> .keyBy((KeySelector) Customer::getIdentifier)
> 
> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
> .trigger(new EventTimeTrigger())
> 

Re: Unsubscribe

2021-05-06 Thread Chesnay Schepler
Could you reach out to the Apache Infrastructure team 
 about not being able to 
unsubscribe? Maybe this functionality is currently broken.


On 5/6/2021 12:48 PM, Andrew Kramer wrote:
I have been unable to unsubscribe as well. Have tried emailing just 
like you


On Thu, May 6, 2021 at 3:33 AM Xander Song > wrote:


How can I unsubscribe from the Apache Flink user mailing list? I
have tried emailing user-unsubscr...@flink.apache.org
, but am still receiving
messages.

Thank you.





Re: Unsubscribe

2021-05-06 Thread Andrew Kramer
I have been unable to unsubscribe as well. Have tried emailing just like
you

On Thu, May 6, 2021 at 3:33 AM Xander Song  wrote:

> How can I unsubscribe from the Apache Flink user mailing list? I have
> tried emailing user-unsubscr...@flink.apache.org, but am still receiving
> messages.
>
> Thank you.
>


callback by using process function

2021-05-06 Thread Abdullah bin Omar
Hi,

According to [1] example section,

(i) If we check the stored count of the last modification time against the
previous timestamp count, then emit the count if they (count from last
modification time) match with the previous timestamp count.

Is there refere about checking the previous count? am I understanding
correctly? help me to understand this part.

(ii)  can the process function be used to look back the previous key/count?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/

Thank you


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-06 Thread Rui Li
Thanks to Dawid and Guowei for the great work!

On Thu, May 6, 2021 at 4:48 PM Zhu Zhu  wrote:

> Thanks Dawid and Guowei for being the release managers! And thanks
> everyone who has made this release possible!
>
> Thanks,
> Zhu
>
> Yun Tang  于2021年5月6日周四 下午2:30写道:
>
>> Thanks for Dawid and Guowei's great work, and thanks for everyone
>> involved for this release.
>>
>> Best
>> Yun Tang
>> --
>> *From:* Xintong Song 
>> *Sent:* Thursday, May 6, 2021 12:08
>> *To:* user ; dev 
>> *Subject:* Re: [ANNOUNCE] Apache Flink 1.13.0 released
>>
>> Thanks Dawid & Guowei as the release managers, and everyone who has
>> contributed to this release.
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:
>>
>> > Thanks Dawid & Guowei for the great work, thanks everyone involved.
>> >
>> > Best,
>> > Leonard
>> >
>> > 在 2021年5月5日,17:12,Theo Diefenthal 
>> 写道:
>> >
>> > Thanks for managing the release. +1. I like the focus on improving
>> > operations with this version.
>> >
>> > --
>> > *Von: *"Matthias Pohl" 
>> > *An: *"Etienne Chauchot" 
>> > *CC: *"dev" , "Dawid Wysakowicz" <
>> > dwysakow...@apache.org>, "user" ,
>> > annou...@apache.org
>> > *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
>> > *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>> >
>> > Yes, thanks for managing the release, Dawid & Guowei! +1
>> >
>> > On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
>> > wrote:
>> >
>> >> Congrats to everyone involved !
>> >>
>> >> Best
>> >>
>> >> Etienne
>> >> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>> >>
>> >> The Apache Flink community is very happy to announce the release of
>> >> Apache Flink 1.13.0.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> >> distributed, high-performing, always-available, and accurate data
>> streaming
>> >> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the
>> >> improvements for this bugfix release:
>> >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>> >>
>> >> The full release notes are available in Jira:
>> >>
>> >>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
>> >>
>> >> We would like to thank all contributors of the Apache Flink community
>> who
>> >> made this release possible!
>> >>
>> >> Regards,
>> >> Guowei & Dawid
>> >>
>> >>
>> >
>> >
>>
>

-- 
Best regards!
Rui Li


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-06 Thread Zhu Zhu
Thanks Dawid and Guowei for being the release managers! And thanks everyone
who has made this release possible!

Thanks,
Zhu

Yun Tang  于2021年5月6日周四 下午2:30写道:

> Thanks for Dawid and Guowei's great work, and thanks for everyone involved
> for this release.
>
> Best
> Yun Tang
> --
> *From:* Xintong Song 
> *Sent:* Thursday, May 6, 2021 12:08
> *To:* user ; dev 
> *Subject:* Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Thanks Dawid & Guowei as the release managers, and everyone who has
> contributed to this release.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:
>
> > Thanks Dawid & Guowei for the great work, thanks everyone involved.
> >
> > Best,
> > Leonard
> >
> > 在 2021年5月5日,17:12,Theo Diefenthal 
> 写道:
> >
> > Thanks for managing the release. +1. I like the focus on improving
> > operations with this version.
> >
> > --
> > *Von: *"Matthias Pohl" 
> > *An: *"Etienne Chauchot" 
> > *CC: *"dev" , "Dawid Wysakowicz" <
> > dwysakow...@apache.org>, "user" ,
> > annou...@apache.org
> > *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> > *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
> >
> > Yes, thanks for managing the release, Dawid & Guowei! +1
> >
> > On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> > wrote:
> >
> >> Congrats to everyone involved !
> >>
> >> Best
> >>
> >> Etienne
> >> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.13.0.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Guowei & Dawid
> >>
> >>
> >
> >
>


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread Till Rohrmann
Hi Vishal,

thanks a lot for all your feedback on the new reactive mode. I'll try to
answer your questions.

0. In order to avoid confusion let me quickly explain a bit of terminology:
The reactive mode is the new feature that allows Flink to react to newly
available resources and to make use of them. In order to achieve this, it
uses the newly introduce AdaptiveScheduler which works by declaring the
required resources and adapting the job if it loses slots or receives slots
if the job is running below its configured parallelism. The
AdaptiveScheduler can also be used w/o the reactive mode which would give
you the capability that Flink would be able to continue executing your job
if your cluster loses TaskManagers (with the default scheduler, the job
would then fail with not enough resources). The difference between the
reactive and non-reactive mode is that in the reactive mode, Flink ignores
the configured parallelism value and tries to run the job at the
maxParallelism (so to say the maximum possible parallelism your job can be
run with).

1. The AdaptiveScheduler and thereby also the reactive mode uses a simple
slot distribution mechanism where every slot sharing group gets the same
number of slots. The parallelism of an operator in this slot sharing group
is then the minimum of the number of slots and the configured parallelism
(when using the reactive mode it would be the configured maxParallelism).
This is of course not ideal and can lead to unused slots. Moreover, it does
not support scaling different slot sharing groups independently. This is a
limitation of the current implementation.

2. With the reactive mode, external systems can control the parallelism
with which a job is running by starting and stopping TaskManagers.
Admittedly, this does not give you super fine-grained control over the
running operators. Being able to specify ratios for operators could be a
good extension.

3. Local recovery simply has not been implemented because of scoping
reasons. There is nothing fundamentally preventing this from working, it
just hasn't been implemented yet.

4. No, there are currently no separate metrics for restarts and rescaling
operations. I do see the usefulness of such a metric. However, for the
reactive mode where scaling down means that we kill a TaskManager, I am not
entirely sure how we will be able to distinguish this from any other reason
which can kill a TaskManager. The only way I could see this work is by
telling Flink about the killing of a TaskManager.

5. No, Flink is not able to do this kind of operator-based optimizations at
the moment. I think this could be possible once we have implemented the
next step of the reactive mode which is proper auto-scaling.

6. From a high-level perspective, there is not much difference between the
reactive mode and manually taking a savepoint and resuming the job from it,
and changing the parallelism. That's effectively also what Flink does
internally. The difference is that this operation is now automated and that
Flink can handle also situations where you don't get all the resources
after a restart where the manual approach would simply fail.

7. Setting the maxParallelism is only required for the reactive mode and if
you don't want to run an operator with the default maxParallelism value.
Per definition, the maxParallelism defines the maximum parallelism you can
run your operator with. Hence, if you set this value to something, then you
should be sure that you don't have to run your job with higher parallelism
than that. Note, that the reactive mode will try to run the operator with
this parallelism. However, if it has fewer resources, then it will run the
operators at lower parallelism. So the maxParallelism defines the upper
bound for the parallelism of your operator.

The reactive mode's intention is the first step towards more elastic
streaming pipelines and simplified operations. We see it as the foundation
for more advanced features such as true auto-scaling where each operator
can decide its parallelism. I hope this helps to understand the reactive
mode a bit better.

Cheers,
Till

On Wed, May 5, 2021 at 7:50 PM Ken Krugler 
wrote:

> Hi Vishal,
>
> WRT “bring down our internal services” - a common pattern with making
> requests to external services is to measure latency, and throttle (delay)
> requests in response to increased latency.
>
> You’ll see this discussed frequently on web crawling forums as an
> auto-tuning approach.
>
> Typically there’s a steady increase in latency as load on the service
> increases.
>
> The trick is throttling soon enough before you hit the “elbow” where a
> service effectively falls over.
>
> — Ken
>
>
>
> On May 5, 2021, at 9:08 AM, vishalovercome  wrote:
>
> Yes. While back-pressure would eventually ensure high throughput, hand
> tuning
> parallelism became necessary because the job with high source parallelism
> would immediately bring down our internal services - not giving enough time
> to flink to 

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:

> If you keyby then all direct functions see only the elements with the same
> key. So that's the expected behavior and the base of Flink's parallel
> processing capabilities.
>
> If you want to generate a window over all customers, you have to use a
> global window. However, that also means that no parallelization can happen,
> so I'd discourage that.
>
> A better way would be to perform as many calculations as possible in the
> process function (for example create a customer with buy information
> record) and then have a DataStream#global() reshuffle to collect all
> aggregated information on one node.
>
> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:
>
>> Thank you.
>>
>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>
>> Adding this to the source context worked.
>>
>> However I am still getting only one customer in the process method. i would 
>> expect the iterable to provide all customers in the window. or do i have to 
>> maintain state.
>>
>>
>> changes for reference:
>>
>> I made the following change, also removed anly lag that i had introduced for 
>> experimentation earlier.
>>
>> so trigger looks like:
>>
>>
>> @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>> // if the watermark is already past the window fire immediately
>> return TriggerResult.FIRE;
>> } else {
>> //LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>> triggerContext.registerEventTimeTimer(customer.getEventTime());  
>>   return TriggerResult.FIRE;
>>
>> }
>> }
>>
>> @Override
>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>> return time == timeWindow.maxTimestamp() ?
>> TriggerResult.FIRE :
>> TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>> TriggerContext ctx) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public void clear(TimeWindow window, TriggerContext ctx) throws 
>> Exception {
>> ctx.deleteEventTimeTimer(window.maxTimestamp());
>> }
>>
>> @Override
>> public boolean canMerge() {
>> return true;
>> }
>>
>> and *removed latenness*
>>
>> customerStream
>>
>> 
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>> .keyBy((KeySelector) Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>> .process(new CustomAggregateFunction());
>>
>>
>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:
>>
>>> Your source is not setting the timestamp with collectWithTimestamp. I'm
>>> assuming that nothing really moves from the event time's perspective.
>>>
>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:
>>>
 Yes customer generator is setting the event timestamp correctly like I
 see below. I debugged and found that the events are getting late, so never
 executed. i.e,. in the window operator the method  this.isWindowLate(
 actualWindow) is getting executed to false for the rest of the events
 except the first, hence the events are getting skipped, not able to figure
 out where exactly the issue is.

 i have removed evictot=r because I don't think I need it yet.

 stream looks like

 customerStream
 .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
 .keyBy((KeySelector) Customer::getIdentifier)
 
 .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
 .trigger(new EventTimeTrigger())
 .process(new CustomAggregateFunction());


 *Customer generator looks like:*

 while (isRunning) {
 Customer c = new Customer(CUSTOMER_KEY[counter % 5],* 
 LocalTime.now()*, 1000); // that's the event time
 System.out.println("Writing customer: " + c);
 sourceContext.collect(c);
 //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
 Thread.sleep(1000);
 counter++;
 if(counter % 11 == 0) {
 System.out.println("Sleeping for 10 seconds");
 

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
If you keyby then all direct functions see only the elements with the same
key. So that's the expected behavior and the base of Flink's parallel
processing capabilities.

If you want to generate a window over all customers, you have to use a
global window. However, that also means that no parallelization can happen,
so I'd discourage that.

A better way would be to perform as many calculations as possible in the
process function (for example create a customer with buy information
record) and then have a DataStream#global() reshuffle to collect all
aggregated information on one node.

On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:

> Thank you.
>
> sourceContext.collectWithTimestamp(c, c.getEventTime());
>
> Adding this to the source context worked.
>
> However I am still getting only one customer in the process method. i would 
> expect the iterable to provide all customers in the window. or do i have to 
> maintain state.
>
>
> changes for reference:
>
> I made the following change, also removed anly lag that i had introduced for 
> experimentation earlier.
>
> so trigger looks like:
>
>
> @Override
> public TriggerResult onElement(Customer customer, long l, TimeWindow 
> timeWindow, TriggerContext triggerContext) throws Exception {
> if (timeWindow.maxTimestamp() <= 
> triggerContext.getCurrentWatermark()) {
> // if the watermark is already past the window fire immediately
> return TriggerResult.FIRE;
> } else {
> //LOGGER.info("Max timestamp for customer: " + 
> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
> triggerContext.registerEventTimeTimer(customer.getEventTime());   
>  return TriggerResult.FIRE;
>
> }
> }
>
> @Override
> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
> TriggerContext triggerContext) {
> return time == timeWindow.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, 
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(TimeWindow window, TriggerContext ctx) throws Exception 
> {
> ctx.deleteEventTimeTimer(window.maxTimestamp());
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> and *removed latenness*
>
> customerStream
>
> 
> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
> .keyBy((KeySelector) Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
> .process(new CustomAggregateFunction());
>
>
> On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:
>
>> Your source is not setting the timestamp with collectWithTimestamp. I'm
>> assuming that nothing really moves from the event time's perspective.
>>
>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:
>>
>>> Yes customer generator is setting the event timestamp correctly like I
>>> see below. I debugged and found that the events are getting late, so never
>>> executed. i.e,. in the window operator the method  this.isWindowLate(
>>> actualWindow) is getting executed to false for the rest of the events
>>> except the first, hence the events are getting skipped, not able to figure
>>> out where exactly the issue is.
>>>
>>> i have removed evictot=r because I don't think I need it yet.
>>>
>>> stream looks like
>>>
>>> customerStream
>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>> .keyBy((KeySelector) Customer::getIdentifier)
>>> 
>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>> .trigger(new EventTimeTrigger())
>>> .process(new CustomAggregateFunction());
>>>
>>>
>>> *Customer generator looks like:*
>>>
>>> while (isRunning) {
>>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
>>> 1000); // that's the event time
>>> System.out.println("Writing customer: " + c);
>>> sourceContext.collect(c);
>>> //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>> Thread.sleep(1000);
>>> counter++;
>>> if(counter % 11 == 0) {
>>> System.out.println("Sleeping for 10 seconds");
>>> Thread.sleep(1);
>>> }
>>> }
>>>
>>>
>>> Custom Watermark generator has this:
>>>
>>> .
>>> @Override
>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>> watermarkOutput) {
>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>> customer.getEventTime()  );
>>> }
>>>
>>> @Override

Unsubscribe

2021-05-06 Thread Xander Song
How can I unsubscribe from the Apache Flink user mailing list? I have tried
emailing user-unsubscr...@flink.apache.org, but am still receiving messages.

Thank you.


No result shown when submitting the SQL in cli

2021-05-06 Thread tao xiao
Hi team,

I wrote a simple SQL job to select data from Kafka. I can see results
printing out in IDE but when I submit the job to a standalone cluster in
CLI there is no result shown. I am sure the job is running well in the
cluster with debug log suggesting that the kafka consumer is fetching data
from Kafka. I enabled debug log in CLI and I don't see any obvious log.
Here is the job code snippet

public static void main(String[] args) throws Exception {
  StreamTableEnvironment tableEnv = StreamTableEnvironment
  
.create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));

  String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
  splitIgnoreQuota(sqls, ';').forEach(sql -> {
TableResult tableResult = tableEnv.executeSql(sql);
tableResult.print();
  });
}

It simply parses a sql file and execute the statements

Here is the SQL statements

CREATE TABLE t1 (
  `f1` STRING,
  `f2` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic',
  'properties.group.id' = 'test1',
  'properties.max.partition.fetch.bytes' = '16384',
  'properties.enable.auto.commit' = 'false',
  'properties.bootstrap.servers' = 'kafka:9092',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

SELECT * FROM t1


Below is the result I got from IDE
| +I | b8f5 |   abcd |
| +I | b8f5 |   abcd |

And this is the result from CLI
bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
/sample.sql
++
| result |
++
| OK |
++
1 row in set
Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
++++
| op |   uuid |ots |
++++


-- 
Regards,
Tao


some questions about data skew

2021-05-06 Thread jester jim
Hi,
I have run a program to monitor the sum of the delay in every minutes of a
stream,this is my code:

.map(new RichMapFunction[String,(Long,Int)] {
override def map(in: String): (Long,Int) = {
  var str:String = ""
  try {
val arr = in.split("\\|")
((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
  }catch {
case e:Exception =>{
  System.out.println("data has been dropped" + str)
  null
}
  }
}
  }).slotSharingGroup("kafkaSource").setParallelism(200)
.filter(item =>item !=null && item._1
>=0).slotSharingGroup("kafkaSource").setParallelism(200)
signalSource.keyBy(f=>f._1  )
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .reduce { (e1,e2)
=>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
.addSink(new
OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")

but there is a problem,when the data is not delaying,the key of 1,2,3,4,5
> have so much data that the backPressure is always 1,has any way to avoid
> this condition?

please give me some advice!thank you so much.


Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
Thank you.

sourceContext.collectWithTimestamp(c, c.getEventTime());

Adding this to the source context worked.

However I am still getting only one customer in the process method. i
would expect the iterable to provide all customers in the window. or
do i have to maintain state.


changes for reference:

I made the following change, also removed anly lag that i had
introduced for experimentation earlier.

so trigger looks like:


@Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime());
   return TriggerResult.FIRE;

}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

and *removed latenness*

customerStream


//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());


On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:

> Your source is not setting the timestamp with collectWithTimestamp. I'm
> assuming that nothing really moves from the event time's perspective.
>
> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:
>
>> Yes customer generator is setting the event timestamp correctly like I
>> see below. I debugged and found that the events are getting late, so never
>> executed. i.e,. in the window operator the method  this.isWindowLate(
>> actualWindow) is getting executed to false for the rest of the events
>> except the first, hence the events are getting skipped, not able to figure
>> out where exactly the issue is.
>>
>> i have removed evictot=r because I don't think I need it yet.
>>
>> stream looks like
>>
>> customerStream
>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>> .keyBy((KeySelector) Customer::getIdentifier)
>> 
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>> .trigger(new EventTimeTrigger())
>> .process(new CustomAggregateFunction());
>>
>>
>> *Customer generator looks like:*
>>
>> while (isRunning) {
>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
>> 1000); // that's the event time
>> System.out.println("Writing customer: " + c);
>> sourceContext.collect(c);
>> //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>> Thread.sleep(1000);
>> counter++;
>> if(counter % 11 == 0) {
>> System.out.println("Sleeping for 10 seconds");
>> Thread.sleep(1);
>> }
>> }
>>
>>
>> Custom Watermark generator has this:
>>
>> .
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput 
>> watermarkOutput) {
>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>> customer.getEventTime()  );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .
>>
>> trigger looks like:
>>
>> --
>>
>>
>>  @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>> // if the watermark is already past the window fire immediately
>> return TriggerResult.FIRE;
>> } else {
>> LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>> 
>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>

Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
Your source is not setting the timestamp with collectWithTimestamp. I'm
assuming that nothing really moves from the event time's perspective.

On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:

> Yes customer generator is setting the event timestamp correctly like I see
> below. I debugged and found that the events are getting late, so never
> executed. i.e,. in the window operator the method  this.isWindowLate(
> actualWindow) is getting executed to false for the rest of the events
> except the first, hence the events are getting skipped, not able to figure
> out where exactly the issue is.
>
> i have removed evictot=r because I don't think I need it yet.
>
> stream looks like
>
> customerStream
> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
> .keyBy((KeySelector) Customer::getIdentifier)
> 
> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
> .trigger(new EventTimeTrigger())
> .process(new CustomAggregateFunction());
>
>
> *Customer generator looks like:*
>
> while (isRunning) {
> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
> 1000); // that's the event time
> System.out.println("Writing customer: " + c);
> sourceContext.collect(c);
> //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
> Thread.sleep(1000);
> counter++;
> if(counter % 11 == 0) {
> System.out.println("Sleeping for 10 seconds");
> Thread.sleep(1);
> }
> }
>
>
> Custom Watermark generator has this:
>
> .
> @Override
> public void onEvent(Customer customer, long l, WatermarkOutput 
> watermarkOutput) {
> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
> customer.getEventTime()  );
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>
> }
> .
>
> trigger looks like:
>
> --
>
>
>  @Override
> public TriggerResult onElement(Customer customer, long l, TimeWindow 
> timeWindow, TriggerContext triggerContext) throws Exception {
> if (timeWindow.maxTimestamp() <= 
> triggerContext.getCurrentWatermark()) {
> // if the watermark is already past the window fire immediately
> return TriggerResult.FIRE;
> } else {
> LOGGER.info("Max timestamp for customer: " + 
> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
> 
> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
> return TriggerResult.FIRE;
> }
> }
>
> @Override
> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
> TriggerContext triggerContext) {
> //if (timeWindow.maxTimestamp() > 
> triggerContext.getCurrentWatermark()) {
> //
> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
> //return TriggerResult.CONTINUE;
> //}
>
> return time == timeWindow.maxTimestamp() ?
> TriggerResult.FIRE :
> TriggerResult.CONTINUE;
> }
>
>
> 
>
>
> On Thu, May 6, 2021 at 12:02 PM Arvid Heise  wrote:
>
>> Hi,
>>
>> Is your CustomerGenerator setting the event timestamp correctly? Are your
>> evictors evicting too early?
>>
>> You can try to add some debug output into the watermark assigner and see
>> if it's indeed progressing as expected.
>>
>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:
>>
>>> This seems to be working fine in processing time but doesn't work in
>>> event time. Is there an issue with the way the water mark is defined or do
>>> we need to set up timers?
>>>
>>> Please advise.
>>>
>>>
>>> WORKING:
>>>
>>> customerStream
>>> .keyBy((KeySelector) Customer::getIdentifier)
>>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>> .process(new CustomAggregateFunction());
>>>
>>>
>>> NOT WORKING:
>>>
>>> customerStream
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>> WaterMarkAssigner()))
>>> .keyBy(Customer::getIdentifier)
>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>> .trigger(EventTimeTrigger.create())
>>> .evictor(new CustomerEvictor())
>>> .process(new CustomAggregateFunction())
>>> .print();
>>>
>>>
>>> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>>>
 Adding the code for CustomWatermarkGenerator

 .
 @Override
 public void onEvent(Customer customer, long l, WatermarkOutput 
 watermarkOutput) {
 currentMaxTimestamp = Math.max(currentMaxTimestamp, 
 customer.getEventTime()  );
 }

 @Override
 public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
 watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

 }
 

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
Yes customer generator is setting the event timestamp correctly like I see
below. I debugged and found that the events are getting late, so never
executed. i.e,. in the window operator the method  this.isWindowLate(
actualWindow) is getting executed to false for the rest of the events
except the first, hence the events are getting skipped, not able to figure
out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like

customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector) Customer::getIdentifier)

.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());


*Customer generator looks like:*

while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5],*
LocalTime.now()*, 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(1);
}
}


Custom Watermark generator has this:

.
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

}
.

trigger looks like:

--


 @Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());

triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
//if (timeWindow.maxTimestamp() >
triggerContext.getCurrentWatermark()) {
//triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
//return TriggerResult.CONTINUE;
//}

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}





On Thu, May 6, 2021 at 12:02 PM Arvid Heise  wrote:

> Hi,
>
> Is your CustomerGenerator setting the event timestamp correctly? Are your
> evictors evicting too early?
>
> You can try to add some debug output into the watermark assigner and see
> if it's indeed progressing as expected.
>
> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:
>
>> This seems to be working fine in processing time but doesn't work in
>> event time. Is there an issue with the way the water mark is defined or do
>> we need to set up timers?
>>
>> Please advise.
>>
>>
>> WORKING:
>>
>> customerStream
>> .keyBy((KeySelector) Customer::getIdentifier)
>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>> .process(new CustomAggregateFunction());
>>
>>
>> NOT WORKING:
>>
>> customerStream
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(EventTimeTrigger.create())
>> .evictor(new CustomerEvictor())
>> .process(new CustomAggregateFunction())
>> .print();
>>
>>
>> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>>
>>> Adding the code for CustomWatermarkGenerator
>>>
>>> .
>>> @Override
>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>> watermarkOutput) {
>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>> customer.getEventTime()  );
>>> }
>>>
>>> @Override
>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>
>>> }
>>> .
>>>
>>>
>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>>>
 Hi,

 Bit of background, I have a stream of customers who have purchased some
 product, reading these transactions on a KAFKA topic. I want to aggregate
 the number of products the customer has purchased in a particular duration
 ( say 10 seconds ) and write to a sink.

 I am using session windows to achieve the above.


Re: Session Windows - not working as expected

2021-05-06 Thread Arvid Heise
Hi,

Is your CustomerGenerator setting the event timestamp correctly? Are your
evictors evicting too early?

You can try to add some debug output into the watermark assigner and see if
it's indeed progressing as expected.

On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:

> This seems to be working fine in processing time but doesn't work in event
> time. Is there an issue with the way the water mark is defined or do we
> need to set up timers?
>
> Please advise.
>
>
> WORKING:
>
> customerStream
> .keyBy((KeySelector) Customer::getIdentifier)
> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
> .process(new CustomAggregateFunction());
>
>
> NOT WORKING:
>
> customerStream
> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
> WaterMarkAssigner()))
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
> .trigger(EventTimeTrigger.create())
> .evictor(new CustomerEvictor())
> .process(new CustomAggregateFunction())
> .print();
>
>
> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>
>> Adding the code for CustomWatermarkGenerator
>>
>> .
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput 
>> watermarkOutput) {
>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>> customer.getEventTime()  );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .
>>
>>
>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>>
>>> Hi,
>>>
>>> Bit of background, I have a stream of customers who have purchased some
>>> product, reading these transactions on a KAFKA topic. I want to aggregate
>>> the number of products the customer has purchased in a particular duration
>>> ( say 10 seconds ) and write to a sink.
>>>
>>> I am using session windows to achieve the above.
>>>
>>> For test purposes, i have mocked  up a customer stream and executed
>>> session windows like below.
>>>
>>> StreamExecutionEnvironment environment = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream customerStream = environment.addSource( new 
>>> CustomerGenerator() );
>>>
>>> customerStream
>>> 
>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>> WaterMarkAssigner()))
>>> .keyBy(Customer::getIdentifier)
>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>> .trigger(EventTimeTrigger.create())
>>> .evictor(new CustomerEvictor())
>>> .process(new CustomAggregateFunction())
>>> .print();
>>>
>>> My watermark assigner looks like:
>>>
>>> public class WaterMarkAssigner implements WatermarkStrategy {
>>> static final Logger logger = 
>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>
>>> @Override
>>> public WatermarkGenerator 
>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>> return new CustomWatermarkGenerator();
>>> }
>>> }
>>>
>>> I notice that the evictor, and aggregation functions are getting called 
>>> only once for the first customer in the stream.
>>>
>>> The data stream is generating customers at 1 seconds interval and there are 
>>> 5 customer keys for which it's generating transactions.
>>>
>>> Am I doing something wrong with the above?
>>>
>>> I want to be able to capture the event on each transaction getting added 
>>> and removed from the window so that I can perform the aggregation.
>>>
>>> please advise.
>>>
>>>
>>>
>>>
>>>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-06 Thread Yun Tang
Thanks for Dawid and Guowei's great work, and thanks for everyone involved for 
this release.

Best
Yun Tang

From: Xintong Song 
Sent: Thursday, May 6, 2021 12:08
To: user ; dev 
Subject: Re: [ANNOUNCE] Apache Flink 1.13.0 released

Thanks Dawid & Guowei as the release managers, and everyone who has
contributed to this release.


Thank you~

Xintong Song



On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:

> Thanks Dawid & Guowei for the great work, thanks everyone involved.
>
> Best,
> Leonard
>
> 在 2021年5月5日,17:12,Theo Diefenthal  写道:
>
> Thanks for managing the release. +1. I like the focus on improving
> operations with this version.
>
> --
> *Von: *"Matthias Pohl" 
> *An: *"Etienne Chauchot" 
> *CC: *"dev" , "Dawid Wysakowicz" <
> dwysakow...@apache.org>, "user" ,
> annou...@apache.org
> *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Yes, thanks for managing the release, Dawid & Guowei! +1
>
> On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Congrats to everyone involved !
>>
>> Best
>>
>> Etienne
>> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.13.0.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Guowei & Dawid
>>
>>
>
>


Re: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB

2021-05-06 Thread Yun Tang
Hi,

你可以参阅文档 [1] :
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 重要信息: 
RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB 
JNI 的限制。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/ops/state/state_backends.html#rocksdbstatebackend

祝好
唐云

From: a593700624 <593700...@qq.com>
Sent: Wednesday, April 28, 2021 15:19
To: user-zh@flink.apache.org 
Subject: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from
RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:455)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:154)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:568)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 20 more


能跑几个小时,总会因为这个问题,一直陷入重启



--
Sent from: http://apache-flink.147419.n8.nabble.com/