Re: flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 Thread JasonLee
hi 

session ,per-job 模式是不支持的 application 模式是支持的 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink job消费kafka 失败,无法拿到offset值

2021-04-22 Thread Qingsheng Ren
你好 Jacob,

从错误上来看是 Kafka Consumer 没有连上 Kafka Brokers。这些方法可能帮助排查问题:

1. 确认 Flink TaskManager 和 Kafka Broker 之间的网络连通性。
2. Flink TaskManager 与 Kafka Broker 之间网络连通并不意味着能够消费数据,可能需要修改 Kafka Broker 
的配置。这篇文章[1] 或许会有帮助,绝大多数 Kafka 的连接问题是由于文章中描述的配置问题导致的。
3. 配置 Log4j 将 org.apache.kafka.clients.consumer 的 Log level 配置为 DEBUG 或 
TRACE,在日志中获取到更多的信息以帮助排查。

希望有所帮助!

[1] 
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

—
Best Regards,

Qingsheng Ren
在 2021年4月14日 +0800 PM12:13,Jacob <17691150...@163.com>,写道:
> 有一个flink job在消费kafka topic消息,该topic存在于kafka两个集群cluster A 和Cluster B,Flink
> Job消费A集群的topic一切正常,但切换到消费B集群就启动失败。
>
> Flink 集群采用Docker部署,Standalone模式。集群资源充足,Slot充足。报错日志如下:
>
> java.lang.Exception: org.apache.kafka.common.errors.TimeoutException:
> Timeout of 6ms expired before the position for partition Test-topic-27
> could be determined
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
> 6ms expired before the position for partition Test-topic-27 could be
> determined
>
> 查询一圈发现基本都是说slot不够之类的原因,已经kafka broker负载等问题,这些问题已经排除。
>
> 请指教
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Debezium CDC | OOM

2021-04-22 Thread Ayush Chauhan
Hi Matthias,

I am using RocksDB as a state backend. I think the iceberg sink is not able
to propagate back pressure to the source which is resulting in OOM for my
CDC pipeline.
Please refer to this - https://github.com/apache/iceberg/issues/2504



On Thu, Apr 22, 2021 at 8:44 PM Matthias Pohl 
wrote:

> Hi Ayush,
> Which state backend have you configured [1]? Have you considered trying
> out RocksDB [2]? RocksDB might help with persisting at least keyed state.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
> On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
> wrote:
>
>> Hi,
>> I am using flink cdc to stream CDC changes in an iceberg table. When I
>> first run the flink job for a topic which has all the data for a table, it
>> get out of heap memory as flink try to load all the data during my 15mins
>> checkpointing interval. Right now, only solution I have is to pass *-ytm
>> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after
>> flink has consumed all the data. Is there a way to tell flink cdc code to
>> trigger checkpoint or throttle the consumption speed(I think backpressure
>> should have handled this)?
>>
>> --
>>  Ayush Chauhan
>>  Software Engineer | Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Software Engineer | Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: 关于upsert-kafka connector的问题

2021-04-22 Thread Shengkai Fang
如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。

Best,
Shengkai


多个复杂算子保证精准一次性

2021-04-22 Thread Colar
您好,

我有如下代码:

datastream.process(new Process1()).process(new Process2())…

这些Process可能有些复杂的计算操作

请问,如果我要保证端到端的精准一次性,我应该在所有的算子上都维护一个状态还是只在最后一个算子维护状态?

Re: Official flink java client

2021-04-22 Thread Yun Gao
Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Re:回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 Thread Xi Shen
我这边有使用jdbc table属性加了本地缓存

尝试把cache size设置为400/2/4,然后重启,消费kafka速度都是需要慢慢上涨



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread 马阳阳
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the 
questions, hoping this will help.


* What is the option you're referring to for the bounded shuffle? That might 
help to understand what streaming mode solution you're looking for.

|
taskmanager.network.blocking-shuffle.type
| "file" | String | The blocking shuffle type, either "mmap" or "file". The 
"auto" means selecting the property type automatically based on system memory 
architecture (64 bit for mmap and 32 bit for file). Note that the memory usage 
of mmap is not accounted by configured memory limits, but some resource 
frameworks like yarn would track this memory usage and kill the container once 
memory exceeding some threshold. Also note that this option is experimental and 
might be changed future. |
* What does the job graph look like? Are you assuming that it's due to a 
shuffling operation? Could you provide the logs to get a better understanding 
of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. 
I think the crash is due to rocksdb. And I could not get the logs (because some 
misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?

   After one tm is killed, the job failed. So I didn’t see the exactly same 
memory increase for other tms. But I think other tms would have similiar 
behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a 
"big state size"? Would increasing the memory limit be an option or do you fear 
that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.


By the answers I provided, I think maybe we should figure out why rocksdb 
overused virtual memory, and caused yarn to kill the container.


On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



flink native k8s ????????

2021-04-22 Thread ??
flink 1.12.2 Native K8s,


 
./bin/kubernetes-session.sh \
 -Dkubernetes.namespace=flink-session-cluster \
 -Dkubernetes.jobmanager.service-account=flink \
 -Dkubernetes.cluster-id=session001 \
 -Dtaskmanager.memory.process.size=1024m \
 -Dkubernetes.taskmanager.cpu=1 \
 -Dtaskmanager.numberOfTaskSlots=4 \
 -Dresourcemanager.taskmanager-timeout=360





??svc??cluster-Ip
./bin/flink run -d \
 -e kubernetes-session \
 -Dkubernetes.namespace=flink-session-cluster \
 -Dkubernetes.cluster-id=session001 \
 examples/streaming/WindowJoin.jar





??k8s??

Re:?????? flink sql????kafka join??????????????????????

2021-04-22 Thread Michael Ran
??
?? 2021-04-22 11:21:55??""  ??
>Tidb??Tidb??TiDBstructured-streaming??
>??
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2021??4??22??(??) 10:50
>??:"user-zh"
>:Re: flink sqlkafka join??
>
>
>
>??SQLparse json??join
>SQL??join70s=3.8k3
>
>??JOIN??
>TiDB??
>useUnicode=truecharacterEncoding=UTF-8serverTimezone=Asia/ShanghairewriteBatchedStatements=true
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink mysql cdc????

2021-04-22 Thread Michael Ran
CDCbinlog
?? 2021-04-22 14:22:18??"" <1353637...@qq.com> ??
>??flink mysql cdc
>1.flink mysql 
>cdc??mysql??binlog??mysql
> 


Re: [ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Xintong Song
Thanks for driving this, Konstantin.
Great job~!

Thank you~

Xintong Song



On Thu, Apr 22, 2021 at 11:57 PM Matthias Pohl 
wrote:

> Thanks for setting this up, Konstantin. +1
>
> On Thu, Apr 22, 2021 at 11:16 AM Konstantin Knauf 
> wrote:
>
>> Hi everyone,
>>
>> all of the Jira Bot rules are live now. Particularly in the beginning the
>> Jira Bot will be very active, because the rules apply to a lot of old,
>> stale tickets. So, if you get a huge amount of emails from the Flink Jira
>> Bot right now, this will get better. In any case, the Flink Jira Bot (or
>> the rules that it implements) demand some changes to how we work with Jira.
>>
>> Here are a few points to make this transition easier for us:
>>
>> *1) Retrospective*
>>
>> In 3-4 weeks I would like to collect feedback. What is working well? What
>> is not working well or getting in your way? Is the bot moving us closer to
>> the goals mentioned in the initial email? Specifically, the
>> initial parameterization [1] of the bot was kind of an educated guess. I
>> will open a [DISCUSS]ion thread to collect feedback and proposals for
>> changes around that time.
>>
>> *2) Use Sub-Tasks*
>>
>> The bot will ask you for an update on assigned tickets after quite a
>> short time for Flink standards. If you are working on a ticket that takes
>> longer, consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts
>> as activity for the parent ticket, too. So, as long as any subtask is
>> moving along you won't be nagged by the bot.
>>
>>
>> *3) Useful Filters*
>>
>> You've probably received a lot of emails already, in particular if you
>> are watching many tickets. Here are a few JIRA filters to find the tickets,
>> that are probably most important to you and have been updated by the bot:
>>
>> Tickets that *you are assigned to*, which are "stale-assigned"
>>
>> https://issues.apache.org/jira/issues/?filter=12350499
>>
>> Tickets that *you reported*, which are stale in anyway:
>>
>> https://issues.apache.org/jira/issues/?filter=12350500
>>
>> If you are a maintainer of some components, you might find the following
>> filters useful (replace with your own components):
>>
>> *All tickets that are about to be closed*
>> project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-minor)
>>
>> *Bugs that are about to be deprioritized or closed*
>> project = FLINK AND type = BUG AND component in ("Build System",
>> "BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
>> "Deployment / YARN", flink-docker, "Release System", "Runtime /
>> Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
>> REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
>> stale-blocker, stale-critical, stale-minor)
>>
>>
>> *Tickets that are stale-assigned, but already have a PR available*project
>> = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-assigned) AND labels in
>> (pull-request-available)
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>


?????? ????upsert-kafka connector??????

2021-04-22 Thread op
??upsert-kafka??key




----
??: 
   "user-zh"



Official flink java client

2021-04-22 Thread gaurav kulkarni
Hi, 
Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 
Thanks,Gaurav

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Run already deployed job on Flink Cluster using RestClusterClient

I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...
 |

 |

 |





Re:请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 Thread 李一飞
这两方法是同步的方式执行的,同时只能执行一个
在 2021-04-22 15:35:07,"x2009438"  写道:
>如题,谢谢各位。
>
>
>发自我的iPhone


Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-04-22 Thread HunterXHunter
没解决,我只能把它关闭了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also
metrics show that no data is being pushed through the sideoutput and that
data in not pulled from the a simulated sideout ( below )

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
*now* pulled from the simulated sideout , essentially the Process Function
with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi 
wrote:

> And when I added the filter the Exception was not thrown. So the sequence
> of events
>
> * Increased lateness from 12 ( that was what it was initially running with
> )  to 24 hours
> * the pipe ran as desired before it blew up with the Exception
> * masked the issue by increasing the lateness to 48 hours.
> * It blew up again but now after the added lateness, so essentially the
> same issue but added lateness let the pipe run for another few hours.
> * Added the Fliter upfront  as below, the pipe has no issues. Also metrics
> show that no data is being pushed through the sideoutput and that data in
> not pulled from the a simulated sideout ( below )
>
>
> public class LateEventFilter extends ProcessFunction VALUE>, KeyedTimedValue> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventFilter(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue value, Context ctx,
> Collector> out) throws Exception {
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
> public class LateEventSideOutput extends ProcessFunction KEY, VALUE>, KeyedTimedValue> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventSideOutput(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue value, Context ctx,
> Collector> out) throws Exception {
> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
>
>  I am using RocksDB as a backend if that helps.
>
> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi 
> wrote:
>
>> Yes sir. The allowedLateNess and side output always existed.
>>
>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
>> wrote:
>>
>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>> being added to your pipeline when running into the
>>> UnsupportedOperationException issue previously?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 As in this is essentially doing what lateness *should* have done  And
 I think that is a bug. My code now is . Please look at the allowedLateness
 on the session window.

 SingleOutputStreamOperator>
 filteredKeyedValue = keyedValue
 .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
 "late_filter").uid("late_filter");
 SingleOutputStreamOperator> lateKeyedValue
 = keyedValue
 .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name
 ("late_data").uid("late_data");
 SingleOutputStreamOperator>
 aggregate = filteredKeyedValue
 .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
 keyBy(value -> value.getKey())
 .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
 .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
 lateOutputTag)
 .trigger(PurgingTrigger.of(CountTrigger.of(1)))
 .aggregate(new SortAggregate(),
 new SessionIdProcessWindowFunction(this.gapInMinutes, this.
 lateNessInMinutes))
 .name("session_aggregate").uid("session_aggregate");

 On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a 
> session
> window was not collected and still is alive for some reason ( a flink bug 
> ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described,
>> introducing a filter for late events is the way to go - just as described
>> in the SO thread you mentioned. Usually, you would collect late 

Re: Multiple jobs in the same Flink project

2021-04-22 Thread Arvid Heise
Hi Oğuzhan,

I think you know the answer already: it's easiest to have 1 jar per
application. And in most cases, it's easiest to also have 1 repo per
application. You can use the same template for all 3 and all future
applications without any special cases.

My rule of thumb is the following: if the life-cycles of applications are
tightly coupled, they can reside in the same repository. So if
update/restart of app1, also means that app2 needs to be updated/restarted,
then use the same CI/CD process.

If (like in your case) the life cycles are independent, treat them as
separate entities. You can have shared code in a 4. repo or include 1 repo
into the other repos.

I would not optimize in the number of repos but in simplicity of a
particular repo. Ultimately, I like to have all repos exactly the same
using the same gradle plugins or build templates (since I don't enjoy doing
DevOp stuff over and over again). If you use GitLab (and I guess similar
tools), it's very easy to manage a large number of repos.

On Thu, Apr 22, 2021 at 7:42 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> We have a flink project with multiple jobs. That means we can submit
> multiple job with the same jar. But there is a limitation here i think.
> Because, let's assume;
>
> I create a flink project with 3 jobs, and create a single jar then put it
> to the flink cluster (all of these steps are working on a ci/cd pipeline,
> and the jar name will be assigned automatically. for example my-jar-v1,
> my-jar-v2 .. ). Then I submit 3 jobs using the same jar.
>
> Later, I changed the job2, then created a new jar with the new version
> e.g. my-jar-v2,  then re-deploy the job2 again with the new jar. But in
> this case, when I look at the submit page in the UI, i don't know which job
> was submitted from the specified jar.
>
> my-jar-v1 => job1, job2, jo3 deployed
> my-jar-v2 => job2 (re-deployed) =>> in this case, i know job2 deployed
> with this jar, but others will not know it because ui does not show this
> information
>
> And also, if any problem occurs in job2 when i deploy it using the
> my-jar-2, i can use the previous jar(my-jar-v1). But if there are a lot of
> jars, it can be very difficult.|
>
> Is there any best practice for that?
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
And when I added the filter the Exception was not thrown. So the sequence
of events

* Increased lateness from 12 ( that was what it was initially running with
)  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the
same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
not pulled from the a simulated sideout ( below )


public class LateEventFilter extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps.

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi 
wrote:

> Yes sir. The allowedLateNess and side output always existed.
>
> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
> wrote:
>
>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>> being added to your pipeline when running into the
>> UnsupportedOperationException issue previously?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> As in this is essentially doing what lateness *should* have done  And I
>>> think that is a bug. My code now is . Please look at the allowedLateness on
>>> the session window.
>>>
>>> SingleOutputStreamOperator>
>>> filteredKeyedValue = keyedValue
>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>> "late_filter").uid("late_filter");
>>> SingleOutputStreamOperator> lateKeyedValue
>>> = keyedValue
>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>>> "late_data").uid("late_data");
>>> SingleOutputStreamOperator>
>>> aggregate = filteredKeyedValue
>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>> keyBy(value -> value.getKey())
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>> lateOutputTag)
>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>> .aggregate(new SortAggregate(),
>>> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
>>> lateNessInMinutes))
>>> .name("session_aggregate").uid("session_aggregate");
>>>
>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I can do that, but I am not certain this is the right filter.  Can you
 please validate. That aside I already have the lateness configured for the
 session window ( the normal withLateNess() )  and this looks like a session
 window was not collected and still is alive for some reason ( a flink bug ?
 )

 if (ctx.timestamp() + allowedLateness > ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }


 On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
 wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing
> a filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how 
>> does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> 

Re: Question about snapshot file

2021-04-22 Thread Abdullah bin Omar
Hi,

I have a savepoint or checkpointed file from my task. However, the file is
binary. I want to see what the file contains.

How is it possible to see what information the file has (or how it is
possible to make it human readable?)

Thank you

On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
wrote:

> Hi Abdullah,
> the metadata file contains handles to the operator states of the
> checkpoint [1]. You might want to have a look into the State Processor API
> [2].
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
> abdullahbinoma...@gmail.com> wrote:
>
>> Hi,
>>
>> (1) what 's the snapshot metadata file (binary) contains ? is it possible
>> to read the snapshot metadata file by using Flink Deserialization?
>>
>> (2) is there any function that can be used to see the previous states on
>> time of operation?
>>
>> Thank you
>>
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
Yes sir. The allowedLateNess and side output always existed.

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi 
> wrote:
>
>> As in this is essentially doing what lateness *should* have done  And I
>> think that is a bug. My code now is . Please look at the allowedLateness on
>> the session window.
>>
>> SingleOutputStreamOperator>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator> lateKeyedValue =
>> keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>> "late_data").uid("late_data");
>> SingleOutputStreamOperator>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate(),
>> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
>> lateNessInMinutes))
>> .name("session_aggregate").uid("session_aggregate");
>>
>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I can do that, but I am not certain this is the right filter.  Can you
>>> please validate. That aside I already have the lateness configured for the
>>> session window ( the normal withLateNess() )  and this looks like a session
>>> window was not collected and still is alive for some reason ( a flink bug ?
>>> )
>>>
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>>
>>>
>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Vishal,
 based on the error message and the behavior you described, introducing
 a filter for late events is the way to go - just as described in the SO
 thread you mentioned. Usually, you would collect late events in some kind
 of side output [1].

 I hope that helps.
 Matthias

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

 On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Well it was not a solution after all. We now have a session window
>> that is stuck with the same issue albeit  after the additional lateness. 
>> I
>> had increased the lateness to 2 days and that masked the issue which 
>> again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 
>> day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end
>> timestamp of an event-time window cannot become earlier than the
>> current watermark by merging. Current watermark: 1619053742129
>> window: TimeWindow{start=161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then
>>> fail after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and 
>>> restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine 
>>> that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark
>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>> {start=1618878336107, end=1618880140466}
>>> at 

Multiple jobs in the same Flink project

2021-04-22 Thread Oğuzhan Mangır
We have a flink project with multiple jobs. That means we can submit
multiple job with the same jar. But there is a limitation here i think.
Because, let's assume;

I create a flink project with 3 jobs, and create a single jar then put it
to the flink cluster (all of these steps are working on a ci/cd pipeline,
and the jar name will be assigned automatically. for example my-jar-v1,
my-jar-v2 .. ). Then I submit 3 jobs using the same jar.

Later, I changed the job2, then created a new jar with the new version e.g.
my-jar-v2,  then re-deploy the job2 again with the new jar. But in this
case, when I look at the submit page in the UI, i don't know which job was
submitted from the specified jar.

my-jar-v1 => job1, job2, jo3 deployed
my-jar-v2 => job2 (re-deployed) =>> in this case, i know job2 deployed with
this jar, but others will not know it because ui does not show this
information

And also, if any problem occurs in job2 when i deploy it using the
my-jar-2, i can use the previous jar(my-jar-v1). But if there are a lot of
jars, it can be very difficult.|

Is there any best practice for that?


flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 Thread casel.chen
flink 
run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
 jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。

Re: [ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Matthias Pohl
Thanks for setting this up, Konstantin. +1

On Thu, Apr 22, 2021 at 11:16 AM Konstantin Knauf  wrote:

> Hi everyone,
>
> all of the Jira Bot rules are live now. Particularly in the beginning the
> Jira Bot will be very active, because the rules apply to a lot of old,
> stale tickets. So, if you get a huge amount of emails from the Flink Jira
> Bot right now, this will get better. In any case, the Flink Jira Bot (or
> the rules that it implements) demand some changes to how we work with Jira.
>
> Here are a few points to make this transition easier for us:
>
> *1) Retrospective*
>
> In 3-4 weeks I would like to collect feedback. What is working well? What
> is not working well or getting in your way? Is the bot moving us closer to
> the goals mentioned in the initial email? Specifically, the
> initial parameterization [1] of the bot was kind of an educated guess. I
> will open a [DISCUSS]ion thread to collect feedback and proposals for
> changes around that time.
>
> *2) Use Sub-Tasks*
>
> The bot will ask you for an update on assigned tickets after quite a short
> time for Flink standards. If you are working on a ticket that takes longer,
> consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts as
> activity for the parent ticket, too. So, as long as any subtask is moving
> along you won't be nagged by the bot.
>
>
> *3) Useful Filters*
>
> You've probably received a lot of emails already, in particular if you are
> watching many tickets. Here are a few JIRA filters to find the tickets,
> that are probably most important to you and have been updated by the bot:
>
> Tickets that *you are assigned to*, which are "stale-assigned"
>
> https://issues.apache.org/jira/issues/?filter=12350499
>
> Tickets that *you reported*, which are stale in anyway:
>
> https://issues.apache.org/jira/issues/?filter=12350500
>
> If you are a maintainer of some components, you might find the following
> filters useful (replace with your own components):
>
> *All tickets that are about to be closed*
> project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
> resolution = Unresolved AND labels in (stale-minor)
>
> *Bugs that are about to be deprioritized or closed*
> project = FLINK AND type = BUG AND component in ("Build System",
> "BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
> "Deployment / YARN", flink-docker, "Release System", "Runtime /
> Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
> REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
> stale-blocker, stale-critical, stale-minor)
>
>
> *Tickets that are stale-assigned, but already have a PR available*project
> = FLINK AND component in ("Build System", "BuildSystem / Shaded",
> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
> resolution = Unresolved AND labels in (stale-assigned) AND labels in
> (pull-request-available)
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Matthias Pohl
You're saying that you used `allowedLateness`/`sideOutputLateData` as
described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
being added to your pipeline when running into the
UnsupportedOperationException issue previously?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi 
wrote:

> As in this is essentially doing what lateness *should* have done  And I
> think that is a bug. My code now is . Please look at the allowedLateness on
> the session window.
>
> SingleOutputStreamOperator> filteredKeyedValue
> = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator> lateKeyedValue =
> keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
> "late_data").uid("late_data");
> SingleOutputStreamOperator>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy
> (value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate(),
> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
> lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi 
> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can you
>> please validate. That aside I already have the lateness configured for the
>> session window ( the normal withLateNess() )  and this looks like a session
>> window was not collected and still is alive for some reason ( a flink bug ?
>> )
>>
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>>
>>
>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Vishal,
>>> based on the error message and the behavior you described, introducing a
>>> filter for late events is the way to go - just as described in the SO
>>> thread you mentioned. Usually, you would collect late events in some kind
>>> of side output [1].
>>>
>>> I hope that helps.
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I saw
 https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
 and this seems to suggest a straight up filter, but I am not sure how does
 that filter works as in would it factor is the lateness when filtering ?

 On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Well it was not a solution after all. We now have a session window
> that is stuck with the same issue albeit  after the additional lateness. I
> had increased the lateness to 2 days and that masked the issue which again
> reared it's head after the 2 days ;lateness was over ( instead of the 1 
> day
> ) before. This is very disconcerting.
>
> Caused by: java.lang.UnsupportedOperationException: The end timestamp
> of an event-time window cannot become earlier than the current
> watermark by merging. Current watermark: 1619053742129 window:
> TimeWindow{start=161883663, end=1618879580402}
>
>
>
> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hey folks,
>>I had a pipe with sessionization restarts and then
>> fail after retries with this exception. The only thing I had done was to
>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>> increasing the lateness created this and the way I solved this was to
>> increase the lateness further. Could this be if there are TMs in the
>> cluster whose time is off ( as in not synchronized )  ?
>>
>> 2021-04-21 11:27:58
>> java.lang.UnsupportedOperationException: The end timestamp of an
>> event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>> 1618878336107, end=1618880140466}
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:339)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:321)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> 

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
As in this is essentially doing what lateness *should* have done  And I
think that is a bug. My code now is . Please look at the allowedLateness on
the session window.

SingleOutputStreamOperator> filteredKeyedValue
= keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
"late_filter").uid("late_filter");
SingleOutputStreamOperator> lateKeyedValue =
keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
"late_data").uid("late_data");
SingleOutputStreamOperator> aggregate
= filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(
value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate(),
new SessionIdProcessWindowFunction(this.gapInMinutes, this.
lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi 
wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a session
> window was not collected and still is alive for some reason ( a flink bug ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described, introducing a
>> filter for late events is the way to go - just as described in the SO
>> thread you mentioned. Usually, you would collect late events in some kind
>> of side output [1].
>>
>> I hope that helps.
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I saw
>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>> and this seems to suggest a straight up filter, but I am not sure how does
>>> that filter works as in would it factor is the lateness when filtering ?
>>>
>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Well it was not a solution after all. We now have a session window that
 is stuck with the same issue albeit  after the additional lateness. I had
 increased the lateness to 2 days and that masked the issue which again
 reared it's head after the 2 days ;lateness was over ( instead of the 1 day
 ) before. This is very disconcerting.

 Caused by: java.lang.UnsupportedOperationException: The end timestamp
 of an event-time window cannot become earlier than the current
 watermark by merging. Current watermark: 1619053742129 window:
 TimeWindow{start=161883663, end=1618879580402}



 On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hey folks,
>I had a pipe with sessionization restarts and then fail
> after retries with this exception. The only thing I had done was to
> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
> from SP and it ran for 12 hours plus without issue. I cannot imagine that
> increasing the lateness created this and the way I solved this was to
> increase the lateness further. Could this be if there are TMs in the
> cluster whose time is off ( as in not synchronized )  ?
>
> 2021-04-21 11:27:58
> java.lang.UnsupportedOperationException: The end timestamp of an
> event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1618966593999 window: TimeWindow{start=
> 1618878336107, end=1618880140466}
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:339)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:321)
> at org.apache.flink.streaming.runtime.operators.windowing.
> MergingWindowSet.addWindow(MergingWindowSet.java:209)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:319)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
> OneInputStreamTask.java:191)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
> at 

Re: Question about snapshot file

2021-04-22 Thread Matthias Pohl
Hi Abdullah,
the metadata file contains handles to the operator states of the checkpoint
[1]. You might want to have a look into the State Processor API [2].

Best,
Matthias

[1]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> (1) what 's the snapshot metadata file (binary) contains ? is it possible
> to read the snapshot metadata file by using Flink Deserialization?
>
> (2) is there any function that can be used to see the previous states on
> time of operation?
>
> Thank you
>


Re: Debezium CDC | OOM

2021-04-22 Thread Matthias Pohl
Hi Ayush,
Which state backend have you configured [1]? Have you considered trying out
RocksDB [2]? RocksDB might help with persisting at least keyed state.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend

On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
wrote:

> Hi,
> I am using flink cdc to stream CDC changes in an iceberg table. When I
> first run the flink job for a topic which has all the data for a table, it
> get out of heap memory as flink try to load all the data during my 15mins
> checkpointing interval. Right now, only solution I have is to pass *-ytm
> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after flink
> has consumed all the data. Is there a way to tell flink cdc code to trigger
> checkpoint or throttle the consumption speed(I think backpressure should
> have handled this)?
>
> --
>  Ayush Chauhan
>  Software Engineer | Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Question about snapshot file

2021-04-22 Thread Abdullah bin Omar
Hi,

(1) what 's the snapshot metadata file (binary) contains ? is it possible
to read the snapshot metadata file by using Flink Deserialization?

(2) is there any function that can be used to see the previous states on
time of operation?

Thank you


Re: Kubernetes Setup - JM as job vs JM as deployment

2021-04-22 Thread Matthias Pohl
Hi Gil,
I'm not sure whether I understand you correctly. What do you mean by
deploying the job manager as "job" or "deployment"? Are you referring to
the different deployment modes, Flink offers [1]? These would be
independent of Kubernetes. Or do you wonder what the differences are
between the Flink on Kubernetes (native) [2] vs Flink on Kubernetes
(standalone using YAML files)?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#deployment-modes
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

On Wed, Apr 21, 2021 at 11:19 PM Gil Amsalem 
wrote:

> Hi,
>
> I found that there are 2 different approaches to setup Flink over
> kubernetes.
> 1. Deploy job manager as Job.
> 2. Deploy job manager as Deployment.
>
> What is the recommended way? What are the benefits of each?
>
> Thanks,
> Gil Amsalem
>


Re: Long to Timestamp(3) Conversion

2021-04-22 Thread Matthias Pohl
Hi Aeden,
there are some improvements to time conversions coming up in Flink 1.13.
For now, the best solution to overcome this is to provide a user-defined
function.

Hope, that helps.
Best,
Matthias

On Wed, Apr 21, 2021 at 9:36 PM Aeden Jameson 
wrote:

> I've probably overlooked something simple, but when converting a
> datastream to a table how does one convert a long to timestamp(3) that
> will not be your event or proc time.
>
> I've tried
>
> tEnv.createTemporaryView(
> "myTable"
> ,myDatastream
> ,
> ,$("myLongTS").toTimestamp()
> )
> which produces the exception,
>
> org.apache.flink.table.api.ValidationException: Field reference
> expression or alias on field expression expected.
>
> I've also tried,
>
> $("myLongTS").toTimestamp().as("myLongTS")
> $("myLongTS as myLongTS").toTimestamp()
>
> Haven't found Gooogling to be of much help on this one.
>
> --
> Thank You,
> Aeden


回复:Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-04-22 Thread 田向阳
唉,这个问题着实让人头大,我现在还没找到原因。你这边确定了跟我说一声哈


| |
田向阳
|
|
邮箱:lucas_...@163.com
|

签名由 网易邮箱大师 定制

在2021年04月22日 20:56,张锴 写道:
你好,我也遇到了这个问题,你的checkpoint是怎么配置的,可以参考一下吗

Haihang Jing  于2021年3月23日周二 下午8:04写道:

> 你好,问题定位到了吗?
> 我也遇到了相同的问题,感觉和checkpoint interval有关
> 我有两个相同的作业(checkpoint interval
> 设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
> 制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> 当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
>
> 看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
The only thing I can think of is to add the lateness configured to the
filter as in here, as in the time on the element + lateness should always
be greater then the current WM. As in the current issue is



Mon Apr 19 20:46:20 EDT 2021.  Window end

Wed Apr 21 21:09:02 EDT 2021,  WM


an event forced this merged window. And it is likely that it has the time
of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit
https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125


Either ways the solution is yukky and not sure how it happened the first
place ?


public class LateEventFilter extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}

On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi 
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>


Re: MemoryStateBackend Issue

2021-04-22 Thread Matthias Pohl
Hi Milind,
I bet someone else might have a faster answer. But could you provide the
logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:

> Hi
>
> I see MemoryStateBackend being used in TM Log
>
> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
> has been configured, using default (Memory / JobManager)
> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
> maxStateSize: 5242880)
>
>
>
> I am logging checkpointed value which is just message count
>
> Snapshot the state 500
> Snapshot the state 1000
>
>
> When I restart the job i.e. new TM but the job manager is same I see
>
> Snapshot the state 500
>
> In the JM logs I see following entries
>
> Triggering checkpoint 1
> Triggering checkpoint 2
>
> After restarting job hence new TM
>
> Triggering checkpoint 1
>
> As per my understanding JM should hold the checkpointed
> 
> state across TM ? Am I correct?
>
> I have not configured anything special and using default. Do I need to add
> any setting to make it work ?
> I want to maintain message count across the TMs.
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
I can do that, but I am not certain this is the right filter.  Can you
please validate. That aside I already have the lateness configured for the
session window ( the normal withLateNess() )  and this looks like a session
window was not collected and still is alive for some reason ( a flink bug ?
)

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing a
> filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi 
> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Well it was not a solution after all. We now have a session window that
>>> is stuck with the same issue albeit  after the additional lateness. I had
>>> increased the lateness to 2 days and that masked the issue which again
>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>> ) before. This is very disconcerting.
>>>
>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>>> an event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>>> 161883663, end=1618879580402}
>>>
>>>
>>>
>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hey folks,
I had a pipe with sessionization restarts and then fail
 after retries with this exception. The only thing I had done was to
 increase the lateness by 12 hours ( to  a day )  in this pipe and restart
 from SP and it ran for 12 hours plus without issue. I cannot imagine that
 increasing the lateness created this and the way I solved this was to
 increase the lateness further. Could this be if there are TMs in the
 cluster whose time is off ( as in not synchronized )  ?

 2021-04-21 11:27:58
 java.lang.UnsupportedOperationException: The end timestamp of an
 event-time window cannot become earlier than the current watermark by
 merging. Current watermark: 1618966593999 window: TimeWindow{start=
 1618878336107, end=1618880140466}
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator$2.merge(WindowOperator.java:339)
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator$2.merge(WindowOperator.java:321)
 at org.apache.flink.streaming.runtime.operators.windowing.
 MergingWindowSet.addWindow(MergingWindowSet.java:209)
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator.processElement(WindowOperator.java:319)
 at org.apache.flink.streaming.runtime.tasks.
 OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
 OneInputStreamTask.java:191)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
 .processElement(StreamTaskNetworkInput.java:204)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
 .emitNext(StreamTaskNetworkInput.java:174)
 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
 .processInput(StreamOneInputProcessor.java:65)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .processInput(StreamTask.java:396)
 at org.apache.flink.streaming.runtime.tasks.mailbox.
 MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .runMailboxLoop(StreamTask.java:617)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:581)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
 at java.lang.Thread.run(Thread.java:748)



>


Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Matthias Pohl
I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
for bringing it up.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22414

On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
wrote:

> Hi Yang,
> isn't this something to fix? If I look at the documentation at  [1], in
> the "Passing configuration via environment variables" section, there is:
>
> "The environment variable FLINK_PROPERTIES should contain a list of Flink
> cluster configuration options separated by new line,
> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
> over configurations in flink-conf.yaml."
>
> To me this means that if I specify "env.hadoop.conf.dir" it should be
> handled as well. Am I wrong?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>
> Best,
> Flavio
>
> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>
>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>> in current implementation, even though we have set the env.xxx flink config
>> options. It is only used to construct the classpath for the JM/TM process.
>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>> configuration from classpath.
>>
>>
>> [1].
>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>> [2].
>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>
>>
>> Best,
>> Yang
>>
>> Best,
>> Yang
>>
>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>
>>> Hi Robert,
>>> indeed my docker-compose does work only if I add also Hadoop and yarn
>>> home while I was expecting that those two variables were generated
>>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable..
>>>
>>> I just want to understand what to expect, if I really need to specify
>>> Hadoop and yarn home as env variables or not
>>>
>>> Il gio 15 apr 2021, 20:39 Robert Metzger  ha
>>> scritto:
>>>
 Hi,

 I'm not aware of any known issues with Hadoop and Flink on Docker.

 I also tried what you are doing locally, and it seems to work:

 flink-jobmanager| 2021-04-15 18:37:48,300 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
 StandaloneSessionClusterEntrypoint.
 flink-jobmanager| 2021-04-15 18:37:48,338 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
 default filesystem.
 flink-jobmanager| 2021-04-15 18:37:48,375 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
 security context.
 flink-jobmanager| 2021-04-15 18:37:48,404 INFO
  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
 user set to flink (auth:SIMPLE)
 flink-jobmanager| 2021-04-15 18:37:48,408 INFO
  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
 file will be created as /tmp/jaas-811306162058602256.conf.
 flink-jobmanager| 2021-04-15 18:37:48,415 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 Initializing cluster services.

 Here's my code:

 https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39

 Hope this helps!

 On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi everybody,
> I'm trying to set up reading from HDFS using docker-compose and Flink
> 1.11.3.
> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
> using FLINK_PROPERTIES (under environment section of the docker-compose
> service) I see in the logs the following line:
>
> "Could not find Hadoop configuration via any of the supported method"
>
> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
> generated by the run scripts.
> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
> environment section of the docker-compose service) I don't see that line.
>
> Is this the expected behavior?
>
> Below the relevant docker-compose service I use (I've removed the
> content of HADOOP_CLASSPATH content because is too long and I didn't 
> report
> the taskmanager that is similar):
>
> flink-jobmanager:
> container_name: flink-jobmanager
> build:
>   context: .
>   dockerfile: Dockerfile.flink
>   args:
> FLINK_VERSION: 1.11.3-scala_2.12-java11
> image: 'flink-test:1.11.3-scala_2.12-java11'
> ports:
>   - "8091:8081"
>   - "8092:8082"
> command: jobmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> rest.port: 8081
> 

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Matthias Pohl
Hi Vishal,
based on the error message and the behavior you described, introducing a
filter for late events is the way to go - just as described in the SO
thread you mentioned. Usually, you would collect late events in some kind
of side output [1].

I hope that helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi 
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>


Re: Flink Statefun Python Batch

2021-04-22 Thread Igal Shilman
Hi Tim,

I've created a tiny PoC, let me know if this helps,
I can't guarantee tho, that this is how we'll eventually approach this, but
it should be somewhere along these lines.

https://github.com/igalshilman/flink-statefun/tree/tim

Thanks,
Igal.


On Thu, Apr 22, 2021 at 6:53 AM Timothy Bess  wrote:

> Hi Igal and Konstantin,
>
> Wow! I appreciate the offer of creating a branch to test with, but for now
> we were able to get it working by tuning a few configs and moving other
> blocking IO out of statefun, so no rush there. That said if you do add
> that, I'd definitely switch over.
>
> That's great! I'll try to think up some suggestions to put into those
> tickets. Yeah I'd be up for a call on Thursday or Friday If you're free
> then, just let me know (my timezone is EDT).
>
> Thanks,
>
> Tim
>
> On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf 
> wrote:
>
>> Hi Igal, Hi Timothy,
>>
>> this sounds very interesting. Both state introspection as well as
>> OpenTracing support have been requested by multiple users before, so
>> certainly something we are willing to invest into. Timothy, would you have
>> time for a 30min call in the next days to understand your use case and
>> requirements better? In the meantime, let's document these feature requests
>> in Jira.
>>
>> * Exposing Batches to SDKs:
>> https://issues.apache.org/jira/browse/FLINK-22389
>> * Support for OpenTracing:
>> https://issues.apache.org/jira/browse/FLINK-22390
>> * Support for State Introspection:
>> https://issues.apache.org/jira/browse/FLINK-22391
>>
>> Please feel free to edit, comment on these issues directly, too.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>> Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <
>> i...@ververica.com>:
>>
>>> Hi Tim,
>>>
>>> Yes, I think that this feature can be implemented relatively fast.
>>> If this blocks you at the moment, I can prepare a branch for you to
>>> experiment with, in the following days.
>>>
>>> Regarding to open tracing integration, I think the community can benefit
>>> a lot out of this,
>>> and definitely contributions are welcome!
>>>
>>> @Konstantin Knauf  would you like to understand more
>>> in depth, Tim's use case with opentracing?
>>>
>>> Thanks,
>>> Igal.
>>>
>>>
>>>
>>> On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess  wrote:
>>>
 Hi Igal,

 Yes! that's exactly what I was thinking. The batching will naturally
 happen as the model applies backpressure. We're using pandas and it's
 pretty costly to create a dataframe and everything to process a single
 event. Internally the SDK has access to the batch and is calling my
 function, which creates a dataframe for each individual event. This causes
 a ton of overhead since we basically get destroyed by the constant factors
 around creating and operating on dataframes.

 Knowing how the SDK works, it seems like it'd be easy to do something
 like your example and maybe have a different decorator for "batch
 functions" where the SDK just passes in everything at once.

 Also just out of curiosity are there plans to build out more
 introspection into statefun's flink state? I was thinking it would be super
 useful to add either Queryable state or have some control topic that
 statefun listens to that allows me to send events to introspect or modify
 flink state.

 For example like:

 // control topic request
 {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
 // response
 {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

 Or

 {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1",
 value: "base64bytes"}
 {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

 Also having opentracing integration where Statefun passes b3 headers
 with each request so we can trace a message's route through statefun would
 be _super_ useful. We'd literally be able to see the entire path of an
 event from ingress to egress and time spent in each function. Not sure if
 there are any plans around that, but since we're live with a statefun
 project now, it's possible we could contribute some if you guys are open to
 it.

 Thanks,

 Tim

 On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman 
 wrote:

> Hi Tim!
>
> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
> batching, that kicks in the presence of a slow
> /congested remote function. Keep in mind that under normal
> circumstances batching does not happen (effectively a batch of size 1 will
> be sent). [1]
> This batch is not currently exposed via the SDKs (both Java and
> Python) as it is an implementation detail (see [2]).
>
> The way I understand your message (please correct me if I'm wrong): is
> that evaluation of the ML model is costly, and it would benefit from some
> sort of batching (like 

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread dhanesh arole
Hi,

Questions that @matth...@ververica.com  asked are
very valid and might provide more leads. But if you haven't already then
it's worth trying to use jemalloc / tcmalloc. We had similar problems with
slow growth in TM memory resulting in pods getting OOMed by k8s. After
switching to jemalloc, the memory foot print improved dramatically.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Thu, Apr 22, 2021 at 1:39 PM Matthias Pohl 
wrote:

> Hi,
> I have a few questions about your case:
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
> * Do you observe the same memory increase for other TaskManager nodes?
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>
> Bet,
> Matthias
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
Well it was not a solution after all. We now have a session window that is
stuck with the same issue albeit  after the additional lateness. I had
increased the lateness to 2 days and that masked the issue which again
reared it's head after the 2 days ;lateness was over ( instead of the 1 day
) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an
event-time window cannot become earlier than the current watermark by
merging. Current watermark: 1619053742129 window: TimeWindow{start=
161883663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi 
wrote:

> Hey folks,
>I had a pipe with sessionization restarts and then fail
> after retries with this exception. The only thing I had done was to
> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
> from SP and it ran for 12 hours plus without issue. I cannot imagine that
> increasing the lateness created this and the way I solved this was to
> increase the lateness further. Could this be if there are TMs in the
> cluster whose time is off ( as in not synchronized )  ?
>
> 2021-04-21 11:27:58
> java.lang.UnsupportedOperationException: The end timestamp of an
> event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1618966593999 window: TimeWindow{start=
> 1618878336107, end=1618880140466}
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:339)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:321)
> at org.apache.flink.streaming.runtime.operators.windowing.
> MergingWindowSet.addWindow(MergingWindowSet.java:209)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:319)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:191)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:396)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:617)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
>
>
>


Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Flavio Pompermaier
Great! Thanks for the support

On Thu, Apr 22, 2021 at 2:57 PM Matthias Pohl 
wrote:

> I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
> for bringing it up.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22414
>
> On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
> wrote:
>
>> Hi Yang,
>> isn't this something to fix? If I look at the documentation at  [1], in
>> the "Passing configuration via environment variables" section, there is:
>>
>> "The environment variable FLINK_PROPERTIES should contain a list of Flink
>> cluster configuration options separated by new line,
>> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
>> over configurations in flink-conf.yaml."
>>
>> To me this means that if I specify "env.hadoop.conf.dir" it should be
>> handled as well. Am I wrong?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>>
>>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>>> in current implementation, even though we have set the env.xxx flink config
>>> options. It is only used to construct the classpath for the JM/TM process.
>>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>>> configuration from classpath.
>>>
>>>
>>> [1].
>>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>>> [2].
>>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Best,
>>> Yang
>>>
>>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>>
 Hi Robert,
 indeed my docker-compose does work only if I add also Hadoop and yarn
 home while I was expecting that those two variables were generated
 automatically just setting env.xxx variables in FLINK_PROPERTIES variable..

 I just want to understand what to expect, if I really need to specify
 Hadoop and yarn home as env variables or not

 Il gio 15 apr 2021, 20:39 Robert Metzger  ha
 scritto:

> Hi,
>
> I'm not aware of any known issues with Hadoop and Flink on Docker.
>
> I also tried what you are doing locally, and it seems to work:
>
> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
> Starting
> StandaloneSessionClusterEntrypoint.
> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> default filesystem.
> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> security context.
> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
> user set to flink (auth:SIMPLE)
> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
> file will be created as /tmp/jaas-811306162058602256.conf.
> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Initializing cluster services.
>
> Here's my code:
>
> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>
> Hope this helps!
>
> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi everybody,
>> I'm trying to set up reading from HDFS using docker-compose and Flink
>> 1.11.3.
>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
>> using FLINK_PROPERTIES (under environment section of the docker-compose
>> service) I see in the logs the following line:
>>
>> "Could not find Hadoop configuration via any of the supported method"
>>
>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
>> generated by the run scripts.
>> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
>> environment section of the docker-compose service) I don't see that line.
>>
>> Is this the expected behavior?
>>
>> Below the relevant docker-compose service I use (I've removed the
>> content of HADOOP_CLASSPATH content because is too long and I didn't 
>> report
>> the taskmanager that is similar):
>>
>> flink-jobmanager:
>> container_name: flink-jobmanager
>> build:
>>   context: .
>>   dockerfile: Dockerfile.flink
>>   args:
>> FLINK_VERSION: 1.11.3-scala_2.12-java11
>> image: 'flink-test:1.11.3-scala_2.12-java11'
>> ports:
>>   - "8091:8081"
>>   - 

Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-04-22 Thread 张锴
你好,我也遇到了这个问题,你的checkpoint是怎么配置的,可以参考一下吗

Haihang Jing  于2021年3月23日周二 下午8:04写道:

> 你好,问题定位到了吗?
> 我也遇到了相同的问题,感觉和checkpoint interval有关
> 我有两个相同的作业(checkpoint interval
> 设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
> 制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
> tolerable failure threshold.
> 当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
>
> 看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
> feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
I saw
https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
and this seems to suggest a straight up filter, but I am not sure how does
that filter works as in would it factor is the lateness when filtering ?

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
wrote:

> Well it was not a solution after all. We now have a session window that is
> stuck with the same issue albeit  after the additional lateness. I had
> increased the lateness to 2 days and that masked the issue which again
> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
> ) before. This is very disconcerting.
>
> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
> an event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1619053742129 window: TimeWindow{start=
> 161883663, end=1618879580402}
>
>
>
> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi 
> wrote:
>
>> Hey folks,
>>I had a pipe with sessionization restarts and then fail
>> after retries with this exception. The only thing I had done was to
>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>> increasing the lateness created this and the way I solved this was to
>> increase the lateness further. Could this be if there are TMs in the
>> cluster whose time is off ( as in not synchronized )  ?
>>
>> 2021-04-21 11:27:58
>> java.lang.UnsupportedOperationException: The end timestamp of an
>> event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>> 1618878336107, end=1618880140466}
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:339)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:321)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:319)
>> at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:191)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:204)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:174)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:65)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:396)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:191)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:617)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:581)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread Matthias Pohl
Hi,
I have a few questions about your case:
* What is the option you're referring to for the bounded shuffle? That
might help to understand what streaming mode solution you're looking for.
* What does the job graph look like? Are you assuming that it's due to a
shuffling operation? Could you provide the logs to get a better
understanding of your case?
* Do you observe the same memory increase for other TaskManager nodes?
* Are you expecting to reach the memory limits considering that you
mentioned a "big state size"? Would increasing the memory limit be an
option or do you fear that it's caused by some memory leak?

Bet,
Matthias

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

> The Flink version we used is 1.12.0.
>
> 马阳阳
> ma_yang_y...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 04/16/2021 16:07,马阳阳 
> wrote:
>
> Hi, community,
> When running a Flink streaming job with big state size, one task manager
> process was killed by the yarn node manager. The following log is from the
> yarn node manager:
>
> 2021-04-16 11:51:23,013 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Container
> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
> Killing container.
>
> When searching solution for this problem, I found that there is a option
> for this that worked for bounded shuffle. So is there a way to get rid of
> this in streaming mode?
>
> PS:
> memory related options:
> taskmanager.memory.process.size:12288m
> taskmanager.memory.managed.fraction:0.7
>
>


Re: 关于upsert-kafka connector的问题

2021-04-22 Thread Shengkai Fang
Hi,

请问是有什么具体的问题吗?

Best,
Shengkai

op <520075...@qq.com> 于2021年4月22日周四 下午6:05写道:

> 用 upsert-kafka connector 作为source,会有key的插入和更新出现乱序导致结果不准的问题吗?
> 谢谢


????upsert-kafka connector??????

2021-04-22 Thread op
?? upsert-kafka connector 
source??key??


flink1.12.2,interval join并没有 inProcessingTime() and inEventTime()

2021-04-22 Thread tianxy

 

FLIP-134: Batch execution for the DataStream API
Allow explicitly configuring time behaviour on KeyedStream.intervalJoin()
FLINK-19479

Before Flink 1.12 the KeyedStream.intervalJoin() operation was changing
behavior based on the globally set Stream TimeCharacteristic. In Flink 1.12
we introduced explicit inProcessingTime() and inEventTime() methods on
IntervalJoin and the join no longer changes behaviour based on the global
characteristic.


flink1.12.2,interval join并没有 inProcessingTime() and inEventTime()并没有找到对应的方法 
是不是还不支持 求解答!!!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.12.2使用rocksdb状态后端,checkpoint size变大

2021-04-22 Thread tianxy
452 
COMPLETED
103/103 2021-04-22 17:29:12 2021-04-22 17:29:12 325ms   4.40 MB 0 B 
(5.39 KB)
451 
COMPLETED
103/103 2021-04-22 17:28:12 2021-04-22 17:28:12 122ms   4.43 MB 9.36 KB 
(15.2
KB)
450 
COMPLETED
103/103 2021-04-22 17:27:12 2021-04-22 17:27:12 124ms   4.43 MB 0 B 
(5.39 KB)
449 
COMPLETED
103/103 2021-04-22 17:26:12 2021-04-22 17:26:12 112ms   4.48 MB 0 B 
(5.39 KB)
448 
COMPLETED
103/103 2021-04-22 17:25:12 2021-04-22 17:25:12 117ms   4.35 MB 0 B 
(5.39 KB)
447 
COMPLETED
103/103 2021-04-22 17:24:12 2021-04-22 17:24:12 113ms   4.41 MB 113 B 
(5.71
KB)
446 
COMPLETED
103/103 2021-04-22 17:23:12 2021-04-22 17:23:12 108ms   4.42 MB 9 B 
(5.86 KB)
445 
COMPLETED
103/103 2021-04-22 17:22:12 2021-04-22 17:22:12 130ms   4.47 MB 0 B 
(5.39 KB)
444 
COMPLETED
103/103 2021-04-22 17:21:12 2021-04-22 17:21:12 155ms   4.35 MB 0 B 
(5.39 KB)
443 
COMPLETED
103/103 2021-04-22 17:20:12 2021-04-22 17:20:12 119ms   4.45 MB 19.0 KB 
(46.1
KB)


flink1.12.2使用rocksdb作为后端存储时,发现ck size不断变大,我使用的是interval join
按理是不断失效的,应该在某个值附近摆动,同样的程序换成使用 fsstatebackend时发现就一直维持在几百k附近 不会一直变大。
请问这个是什么原因呢? 本来采取fsstatebackend,结果发现 运行一段时间(比如几个小时后)就会突然出现ck
失败,5分钟内无法完成ck超时,看日志并不是作业报错,只是单纯的 报以下错误:

Checkpoint 19 of job dd7b3ab0ec365d23c5dfa25dcf53a730 expired before complet
java.util.concurrent.CancellationException: null

请教大佬,求解答!



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 疑问:当开启state.backend.incremental 后 Checkpointed Data Size 会不断变大

2021-04-22 Thread tianxy
你好  我也遇到了 所以这个问题你知道原因了没



--
Sent from: http://apache-flink.147419.n8.nabble.com/


[ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Konstantin Knauf
Hi everyone,

all of the Jira Bot rules are live now. Particularly in the beginning the
Jira Bot will be very active, because the rules apply to a lot of old,
stale tickets. So, if you get a huge amount of emails from the Flink Jira
Bot right now, this will get better. In any case, the Flink Jira Bot (or
the rules that it implements) demand some changes to how we work with Jira.

Here are a few points to make this transition easier for us:

*1) Retrospective*

In 3-4 weeks I would like to collect feedback. What is working well? What
is not working well or getting in your way? Is the bot moving us closer to
the goals mentioned in the initial email? Specifically, the
initial parameterization [1] of the bot was kind of an educated guess. I
will open a [DISCUSS]ion thread to collect feedback and proposals for
changes around that time.

*2) Use Sub-Tasks*

The bot will ask you for an update on assigned tickets after quite a short
time for Flink standards. If you are working on a ticket that takes longer,
consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts as
activity for the parent ticket, too. So, as long as any subtask is moving
along you won't be nagged by the bot.


*3) Useful Filters*

You've probably received a lot of emails already, in particular if you are
watching many tickets. Here are a few JIRA filters to find the tickets,
that are probably most important to you and have been updated by the bot:

Tickets that *you are assigned to*, which are "stale-assigned"

https://issues.apache.org/jira/issues/?filter=12350499

Tickets that *you reported*, which are stale in anyway:

https://issues.apache.org/jira/issues/?filter=12350500

If you are a maintainer of some components, you might find the following
filters useful (replace with your own components):

*All tickets that are about to be closed*
project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
"Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
flink-docker, "Release System", "Runtime / Coordination", "Runtime /
Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
resolution = Unresolved AND labels in (stale-minor)

*Bugs that are about to be deprioritized or closed*
project = FLINK AND type = BUG AND component in ("Build System",
"BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
"Deployment / YARN", flink-docker, "Release System", "Runtime /
Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
stale-blocker, stale-critical, stale-minor)


*Tickets that are stale-assigned, but already have a PR available*project =
FLINK AND component in ("Build System", "BuildSystem / Shaded", "Deployment
/ Kubernetes", "Deployment / Mesos", "Deployment / YARN", flink-docker,
"Release System", "Runtime / Coordination", "Runtime / Metrics", "Runtime /
Queryable State", "Runtime / REST", Travis) AND resolution = Unresolved AND
labels in (stale-assigned) AND labels in (pull-request-available)

Cheers,

Konstantin

[1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


请问在使用processfunction 中的processelement()和onTimer()需要考虑并发问题吗?

2021-04-22 Thread x2009438
如题,谢谢各位。


发自我的iPhone

flink mysql cdc????

2021-04-22 Thread ????
??flink mysql cdc
1.flink mysql 
cdc??mysql??binlog??mysql
 

Re: flink 1.12.2 sql-cli 写入Hive报错 is_generic

2021-04-22 Thread Rui Li
可以发一下具体的SQL语句么(包括DDL和insert)?

On Wed, Apr 21, 2021 at 5:46 PM HunterXHunter <1356469...@qq.com> wrote:

> 在ddl的时候设置了 watermark。在任务页面查看watermark的时候一直没有更新watermark
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


??????flink sql cdc????kafka????????????????????

2021-04-22 Thread ????
flink-cdcSourceRecord??SourceRecord??topic??
??Debezium 
mysql-conectorkafka-connectortopic??
?? ??+??+topic?? 
SourceRecord??topic



----
??: 
   "user-zh"



Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 Thread Xi Shen
Cache设置大小为2w,超时时间为2h
实际上整个表大小为3w左右,考虑到整个表实际只有十几兆。我会尝试cache size设置为4w,保证整个表都能装进cache里。看会不会好一点


但是我查到现在怀疑跟savepoint有关:
- 如果我设置kafka offset=earliest,不带savepoint重启,flink
job启动消费时,lag有5000w左右,但是1分钟内就能达到约7k/s的消费速度。如下图,job在14:31启动,前面的速度特别大是因为offset重置,但是在14:33已经达到7.5k的消费速度
 
- 但是如果带savepoint启动,需要花35min才能达到这个消费速度。如下图
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: flink sql消费kafka join普通表为何会性能爬坡?

2021-04-22 Thread Xi Shen
读JDBC table是有缓存的,看了源码,是用Guava cache实现

文档上说,整个Task Manager进程共享使用一个Cache,所以应该和广播的效果是一样的?所以应该不是查询TiDB导致的性能问题



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复:flink sql cdc发到kafka消息表名信息缺失问题

2021-04-22 Thread casel.chen



我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink 
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。













在 2021-04-22 11:01:22,"飞翔"  写道:

既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after 
不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record





-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年4月22日(星期四) 上午9:41
收件人: "user-zh@flink.apache.org";
主题: flink sql cdc发到kafka消息表名信息缺失问题


最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal 
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink 
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?


CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '${mysql.hostname}',
  'port' = '3306',
  'username' = '${mysql.username}',
  'password' = '${mysql.password}',
  'database-name' = '${mysql.database}',
  'table-name' = '${mysql.table}'
  );


CREATE TABLE `kafka_sink` (
  `id` INT,
  `name` STRING,
  `sys_id` STRING,
  `sequence` INT,
  `filter` STRING,
  `tag` STRING,
  `remark` STRING,
  `create_date` TIMESTAMP,
  `update_date` TIMESTAMP,
  `reserve` STRING,
  `sys_name` STRING,
  `metric_seq` INT,
  `advanced_function` STRING,
  `value_type` STRING,
  `value_field` STRING,
  `status` INT,
  `syn_date` TIMESTAMP,
  `confirmer` STRING,
  `confirm_time` TIMESTAMP,
  `index_explain` STRING,
  `field_name` STRING,
  `tag_values` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = '${topic}',
  'properties.bootstrap.servers' = '${bootstrap.servers}',
  'format' = 'canal-json'
  );


INSERT INTO `kafka_sink`
(SELECT *
 FROM `binlog_table`);







Re: Multiple select queries in single job on flink table API

2021-04-22 Thread tbud
/"TableResult result1 = stmtSet.execute();
result1.print();"/

I tried this, and the result is following :
Job has been submitted with JobID 4803aa5edc31b3ddc884f922008c5c03
+++
| default_catalog.default_database.output1_1 |
default_catalog.default_database.output1_2 |
+++
| -1 |  
  
-1 |
+++
1 row in set

Output folder is created but with empty files.

When I run my queries individually it gives me 68 and 107 rows respectively.
The problem starts happening when I add two statements to a StatementSet.
Has anybody faced this issue ? what was the solution ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 Thread 李一飞
明白了,谢谢~
在 2021-04-21 19:58:23,"Peihui He"  写道:
>fetch.min.bytes
>fetch.wait.max.ms
>还可以用着两个参数控制下的
>
>熊云昆  于2021年4月21日周三 下午7:10写道:
>
>> 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年04月20日 18:19,李一飞 写道:
>> flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>> 最好分流、批场景回答一下,谢谢!


Re:回复:flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-22 Thread 李一飞
谢谢
在 2021-04-21 19:10:17,"熊云昆"  写道:
>有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条
>
>
>| |
>熊云昆
>|
>|
>邮箱:xiongyun...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2021年04月20日 18:19,李一飞 写道:
>flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
>最好分流、批场景回答一下,谢谢!