Re: ctx.timestamp() returning null when using Processing Time

2019-11-06 Thread Yun Tang
Hi Komal

Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be 
null if your program  is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want 
to fetch current processing time stamp, please use `ctx# 
currentProcessingTime()`.

[1] 
https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java#L50
Best
Yun Tang

From: Komal Mariam 
Date: Wednesday, November 6, 2019 at 6:19 PM
To: user 
Subject: ctx.timestamp() returning null when using Processing Time

Dear all,

I want to clear some of my variables in KeyedBroadcastProcessFunction after a 
certain time. I implemented the onTimer() function but even though I am using 
ProcessingTime
like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I 
am getting null when ctx.timestamp() is called.

How do I ensure that some of variables or states inside 
KeyedBroadcasrProcessFunction are cleared after a certain time interval (say 3 
seconds)?

Here is skeleton of what it looks like. I am using Flink 1.9


public static class myFunction extends KeyedBroadcastProcessFunction, List>> {

List fixedPoints;

public void processBroadcastElement(
Point myPoint,
Context ctx,
Collector, List>> out) 
throws Exception {
/* put myPoint in broadcastState*/
}


 public void processElement(Point queryPoint, ReadOnlyContext ctx, 
Collector, List>> out) throws Exception {

/* collect output*/

System.out.println("TimeStamp: " +ctx.timestamp());   //returns 
"Timestamp: null"
ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 3000);  
//java.lang.NullPointerException

}

//does not run due to java.lang.NullPointerException

public void onTimer(long timestamp,
OnTimerContext ctx,
Collector, 
List>> out) throws Exception
{
System.out.println("Clearing...");
fixedPoints.clear();
System.out.println("Clearing...COMPLETE");
}

}

When I change to env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
I get timestamps like these
TimeStamp: 1573016104289
TimeStamp: 1573016104294
TimeStamp: 1573016104292

however the onTimer() function is never called and fixedPoints is not cleared.

My datastreams right now are very limited. keyedStream has 8 elements while 
broadcast stream has 7.

I would really appreciate any help!

Best Regards,
Komal



Re: Flink savepoint(checkpoint) recovery dev debug

2019-11-06 Thread Yun Tang
Hi 

The entrance of restoring savepoint is CheckpointCoordinator#restoreSavepoint 
[1], hope this could help you.

[1] 
https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1173

Best
Yun Tang

On 11/6/19, 5:18 PM, "qq" <471237...@qq.com> wrote:

Hi all.

   I want to simulation the shell command which “flink -s savepoint” , this 
command only can run with shell command, I want to debug it on dev, local 
development environment, anyone could help me ? Thanks very much. I only can 
use Savepoint.load to read the savepoint metadata and data. But I want to run 
the program like "flink -s” on dev phase(use code to run).





Re: 从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 Thread Yun Tang
Hi

首先先判断作业是否在不断地failover,是否有“maximum parallelism” 
相关的异常,如果有,说明因为改了并发度而不兼容,实际作业一直都没有从checkpoint正常恢复。

如果作业成功地从checkpoint恢复了,再判断是不是因为task端正在因为正在改并发而导致恢复数据中,如果你的state比较大,这一步骤可能会比较耗时,一般这种情况是source端消费了数据,但是无法向下游发送,整个作业看上去像是一直卡在那边。可以通过task端的jstak看调用栈,看是否有restore相关的栈hang住。

如果以上都不是,那请自行jstack看一下source和下游task的CPU在进行什么操作,再做之后的判断。

祝好
唐云


From: "wangl...@geekplus.com.cn" 
Date: Tuesday, November 5, 2019 at 11:48 AM
To: user 
Subject: 从 state 中恢复数据,更改 yarn container 个数会有影响吗


从 RocketMQ 中消费数据做处理。
代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr
运行一段时间后以 savepoint 方式停止。
再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从 RocketMQ 
消费数据了,消费 TPS 一直是 0,这是什么原因呢?


谢谢,
王磊



wangl...@geekplus.com.cn


Re: Checkpoint in FlinkSQL

2019-11-04 Thread Yun Tang
Hi Simon

If you are using table API, you could set state backend via environment like 
`env.setStateBackend()`

If you just launch a cluster with SQL-client, you could configure state backend 
and checkpoint options [1] within `flink-conf.yaml` before launching the 
cluster .

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing

Best
Yun Tang

From: Simon Su 
Date: Tuesday, November 5, 2019 at 10:38 AM
To: dev , user 
Subject: Checkpoint in FlinkSQL

Hi All

Does current Flink support to set checkpoint properties while using Flink SQL ?
For example,  statebackend choices, checkpoint interval and so on ...

Thanks,
SImon



Re: Using RocksDB as lookup source in Flink

2019-11-04 Thread Yun Tang
Hi Srikanth

As RocksDB is a single node DB which just like InfluxDB, I recommend you could 
refer to an implementation of InfluxDB sink. [1]

[1] https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb

Best
Yun Tang


From: OpenInx 
Date: Monday, November 4, 2019 at 6:28 PM
To: srikanth flink 
Cc: user 
Subject: Re: Using RocksDB as lookup source in Flink

Hi

The Kafka table source & sink connector has been implemented (at least flink1.9 
support this), but the RocksDB connector
not support yet, you may need to implement it by yourself.  Here[1] we have a 
brief wiki to show  what interfaces we need to implement,
but seems it's not detailed enough personally. I think the existed kafka 
connector code will be helpful for implementing your RocksDB
sink [2].

[1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#define-a-tablesink
[2]. 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java

On Mon, Nov 4, 2019 at 6:05 PM srikanth flink 
mailto:flink.d...@gmail.com>> wrote:
Hi there,

Can someone help me implement Flink source Kafka to Flink Sink RocksDB, while I 
could use UDF for lookup RocksDB in SQL queries?

Context: I get a list of IPaddresses in a stream which I wish to store in 
RocksDB. Therefore the other stream perform a lookup to match the IPaddress.


Thanks
Srikanth


Re: Checkpoint failed all the time

2019-11-03 Thread Yun Tang

Sure, this feature has been implemented in FLINK-12364 [1], all you need do is 
set the tolerable checkpoint failure numbers via like

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);

[1] https://issues.apache.org/jira/browse/FLINK-12364

Best
Yun Tang



From: "slle...@aliyun.com.INVALID" 
Reply-To: "user-zh@flink.apache.org" 
Date: Monday, November 4, 2019 at 12:02 PM
To: "user-zh@flink.apache.org" 
Subject: Checkpoint failed all the time

Checkpoint will fail for some reasons,
Is there a way to make checkpoint fail a certain number of times and then the 
job will fail automatically

[cid:image001.png@01D59306.5A2A8300]



Re: Flink State 过期清除 TTL 问题

2019-10-31 Thread Yun Tang
Hi 王磊

从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 
cleanupFullSnapshot,这样你在执行full 
snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。

另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
[2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv

祝好
唐云


On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn"  
wrote:

flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
descriptor.enableTimeToLive(ttlConfig);

程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从 savepoint 目录恢复。
我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint 目录不断变大。是过期清除策略没生效吗?

谢谢,
王磊



wangl...@geekplus.com.cn




Re: 如何过滤异常的timestamp?

2019-10-31 Thread Yun Tang
Hi 瑞斌

后续的operator在使用window操作时,所依赖的时间戳都是这个ingestion time,如果你的message里面有"event 
time"语义的field,那么后续就可以拿在source端生成的ingestion time 与这个field所表征的时间进行比较。



On 10/31/19, 10:45 AM, "邢瑞斌"  wrote:

Hi 唐云,


谢谢指点,我去试一试。我其实不太理解env中设置的TimeCharacteristic,之前理解的是,这个设置会作用于全局。如果将这个设为IngestionTime,后续的Operator可以再使用EventTime吗?
    

Yun Tang  于2019年10月31日周四 上午2:26写道:

> Hi 瑞斌
>
> 
如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
> operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。
>
> 祝好
> 唐云
> 
> From: 邢瑞斌 
> Sent: Wednesday, October 30, 2019 17:57
> To: user-zh@flink.apache.org 
> Subject: 如何过滤异常的timestamp?
>
> Hi:
>
>
> 
从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?
>
> 我现在的想法是:
>
> 将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
> ,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?
>
> 求教,谢谢大家!
>




Re: Flink SQL + savepoint

2019-10-31 Thread Yun Tang
Hi Fanbin

If you do not change the parallelism or add and remove operators, you could 
still use savepoint to resume your jobs with Flink SQL.

However, as far as I know, Flink SQL might not configure the uid currently and 
I’m pretty sure blink branch contains this part of setting uid to stream node. 
[1]

Already CC Kurt as he could provide more detail information of this.

[1] 
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44

Best
Yun Tang


From: Fanbin Bu 
Date: Thursday, October 31, 2019 at 1:17 PM
To: user 
Subject: Flink SQL + savepoint

Hi,

it is highly recommended that we assign the uid to the operator for the sake of 
savepoint. How do we do this for Flink SQL? According to 
https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
 it is not possible.

Does that mean, I can't use savepoint to restart my program if I use Flink SQL?

Thanks,

Fanbin


Re: 如何过滤异常的timestamp?

2019-10-30 Thread Yun Tang
Hi 瑞斌

如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter
 operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。

祝好
唐云

From: 邢瑞斌 
Sent: Wednesday, October 30, 2019 17:57
To: user-zh@flink.apache.org 
Subject: 如何过滤异常的timestamp?

Hi:

从客户端收集日志信息,总会遇到一些异常的本地Timestamp,有些会超出正确日期很多天。这样的Timestamp会影响Watermark,请问大家是如何处理这样的日志呢?

我现在的想法是:

将日志的时间与Flink的时间相比较,如果超出一个阈值就过滤。但是这样似乎会造成处理的结果是不确定的。我想的改进方法是
,使用IngestionTime和日志的Timestamp比较。但是我不太确定,是否能混合使用IngestionTime和eventTime呢?

求教,谢谢大家!


Re: standalone flink savepoint restoration

2019-10-16 Thread Yun Tang
Hi Matt

Have you ever configured `high-availability.cluster-id` ? If not, Flink 
standalone job would first try to recover from high-availability checkpoint 
store named '/default'. If there existed a checkpoint, Flink would always 
restore from checkpoint disabling 'allowNonRestoredState'[1] (always passing 
'false' in). Please consider to configure `high-availability.cluster-id` to 
different values to enable you could resume job with dropping some operators.


[1] 
https://github.com/apache/flink/blob/7670e237d7d8d3727537c09b8695c860ea92d467/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java#L190

Best
Yun Tang

From: Matt Anger 
Sent: Thursday, October 17, 2019 5:46
To: user@flink.apache.org 
Subject: standalone flink savepoint restoration

Hello everyone,
I am running a flink job in k8s as a standalone HA job. Now I updated my job w/ 
some additional sinks, which I guess have made the checkpoints incompatible 
with the newer version, meaning flink now crashes on bootup with the following:
 Caused by: java.lang.IllegalStateException: There is no operator for the state 
c9b81dfc309f1368ac7efb5864e7b693

So I rollback the deployment, log into the pod and create a savestate, and then 
modify my args to add

--allowNonRestoredState
and
-s 

but it doesn't look like the standalone cluster is respecting those arguments. 
I've tried searching around, but haven't found any solutions. The docker image 
I have is running the docker-entrypoint.sh and the full arg list is below as 
copy-pastad out of my k8s yaml file:

 47 - job-cluster
 48 - -Djobmanager.rpc.address=$(SERVICE_NAME)
 49 - -Djobmanager.rpc.port=6123
 50 - -Dresourcemanager.rpc.port=6123
 51 - -Dparallelism.default=$(NUM_WORKERS)
 52 - -Dblob.server.port=6124
 53 - -Dqueryable-state.server.ports=6125
 54 - -Ds3.access-key=$(AWS_ACCESS_KEY_ID)
 55 - -Ds3.secret-key=$(AWS_SECRET_ACCESS_KEY)
 56 - -Dhigh-availability=zookeeper
 57 - -Dhigh-availability.jobmanager.port=50010
 58 - -Dhigh-availability.storageDir=$(S3_HA)
 59 - -Dhigh-availability.zookeeper.quorum=$(ZK_QUORUM)
 60 - -Dstate.backend=filesystem
 61 - -Dstate.checkpoints.dir=$(S3_CHECKPOINT)
 62 - -Dstate.savepoints.dir=$(S3_SAVEPOINT)
 63 - --allowNonRestoredState
 64 - -s $(S3_SAVEPOINT)

I originally didn't have the last 2 args, I added them based upon various 
emails I saw on this list and other google search results, to no avail.

Thanks
-Matt


Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Yun Tang
Hi Steven

If you restore savepoint/checkpoint successfully, I think this might due to the 
shard wasn't discovered in the previous run, therefore it would be consumed 
from the beginning. Please refer to the implementation here: [1]

[1] 
https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307

Best
Yun Tang

From: Steven Nelson 
Sent: Wednesday, October 16, 2019 4:31
To: user 
Subject: Kinesis Connector and Savepoint/Checkpoint restore.

Hello, we currently use Flink 1.9.0 with Kinesis to process data.

We have extended data retention on the Kinesis stream, which gives us 7 days of 
data.

We have found that when a savepoint/checkpoint is restored that it appears to 
be restarting the Kinesis Consumer from the start of the stream. The 
flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
 property reports to Prometheus that it is behind by 7 days when the process 
starts back up from a savepoint.

We have some logs that say:

Subtask 3 will start consuming seeded shard 
StreamShardHandle{streamName='TheStream', shard='{ShardId: 
shardId-0083,HashKeyRange: {StartingHashKey: 
220651847300296034902031972006537199616,EndingHashKey: 
223310303291865866647839586127097888767},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220601502339755334362523522663986150244033234226,}}'} from sequence 
number EARLIEST_SEQUENCE_NUM with ShardConsumer 20

This seems to indicate that this shard is starting from the beginning of the 
stream

and some logs that say:
Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' 
TheStream ', shard='{ShardId: shardId-0087,HashKeyRange: 
{StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 
233944127258145193631070042609340645375},SequenceNumberRange: 
{StartingSequenceNumber: 
49597946220690705320549456855089665537076743690057155954,}}'} from sequence 
number 49599841594208637293623823226010128300928335129272649074 with 
ShardConsumer 21

This shard seems to be resuming from a specific point.

I am assuming that this might be caused by no data being available on the shard 
for the entire stream (possible with this application stage). Is this the 
expected behavior? I had thought it would checkpoint with the most recent 
sequence number, regardless of if it got data or not.

-Steve




Re: Flink restoring a job from a checkpoint

2019-10-11 Thread Yun Tang
Hi Flavio

If you did not even trigger a savepoint but meet this problem. First of all, 
please ensure your checkpoint would be retained [1]. Once your job fails due to 
a problematic message, you need to cancel your job and modify the job to not 
failover when meeting that problematic message again. Then just submit your job 
to resume from your last checkpoint [2]. This is the general solution to deal 
with nu-recoverable problem.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

Best
Yun Tang

From: Congxian Qiu 
Sent: Friday, October 11, 2019 19:47
To: Flavio Pompermaier 
Cc: Yun Tang ; theo.diefent...@scoop-software.de 
; user 
Subject: Re: Flink restoring a job from a checkpoint

I don't think schedule savepoint periodically is better than periodic 
checkpoint(which flink have out of box).

1. Savepoint and checkpoint have the same code path exception savepoint will do 
a full snapshot, and checkpoint can do an incremental snapshot. If the 
checkpoint can not be done, then the savepoint can not be done also.

2. Checkpoint is periodic already in Flink

3. Savepoint is always full snapshot -- which means maybe slow, checkpoint can 
be incremental, and incremental checkpoint is much faster than savepoint.

Best,
Congxian


Flavio Pompermaier mailto:pomperma...@okkam.it>> 
于2019年10月11日周五 下午5:24写道:
If I understood correctly you're saying that in this case I'd need to reprocess 
all messages from scratch (unless I retain my checkpoints..), right?
Could it be a good strategy to schedule savepoints periodically to avoid such 
situations? Is there any smarter solution to this?

On Fri, Oct 11, 2019 at 4:45 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Any checkpoint could only completed if your job not failed. Since checkpoint 
barrier is injected with messages together, if the problematic message would 
cause your job to fail. You cannot complete any checkpoint after that 
problematic message processed. In other words, you could always resume your job 
from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier mailto:pomperma...@okkam.it>>
Sent: Friday, October 11, 2019 5:50
To: Yun Tang mailto:myas...@live.com>>
Cc: Congxian Qiu mailto:qcx978132...@gmail.com>>; 
theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>>; 
user mailto:user@flink.apache.org>>
Subject: Re: Flink restoring a job from a checkpoint

Sorry for the dumb question but let's suppose to not use retained checkpoint 
and my job processed billions of messages from Kafka. Then a problematic 
message causes my job to fail..am I able to complete a savepoint to fic the job 
and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang mailto:myas...@live.com>> 
ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu mailto:qcx978132...@gmail.com>>
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 
scratch.

Best,
Congxian


theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>> 
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and con

Re: 关于使用RocksDBStateBackend 启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

2019-10-11 Thread Yun Tang
Hi

我觉得你的担心是在TTL尚未过期的周期内,数据就已经写满磁盘了,这个肯定不是TTL能涵盖的问题,从作业规模上尝试限制写入量,或者增大并发,降低单个rocksDB需要承担的数据量(前提是你的所有机器的磁盘空间是大于你的数据量的)。另外如果真的很担心的话,换一个压缩率更小的算法
 也有一些帮助(代价是更耗时更耗CPU, rocksDB 官方推荐ZTSD或者Zlib)[1],设置compression type可以参考rocksdb 
ColumnFamilyOptions的setCompressionType 方法 [2]

[1] https://github.com/facebook/rocksdb/wiki/Compression#configuration
[2] 
https://github.com/facebook/rocksdb/blob/bc8b05cb779a578b5f5acf8d9390af1d17e65ff5/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java#L282

祝好
唐云


From: claylin <1012539...@qq.com>
Sent: Friday, October 11, 2019 16:16
To: user-zh 
Subject: 关于使用RocksDBStateBackend 
启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

在使用RocksDBStateBackend时,为了防止state状态过大导致资源不够用(磁盘),采用了state.backend.rocksdb.ttl.compaction.filter.enabled配置,使得每次rocksdb每次进行compact时候判断状态的ttl时间,然后删除过期的state,https://github.com/facebook/rocksdb/wiki/Time-to-Live里面也有说明,但是有没有这种情况,rocksdb每次compact时候,有些状态并没有compact到,那这个时候已经过期的state就不会被删除。而且flink中的ttl刷新策略只有OnCreateAndWrite和OnReadAndWrite,没有那种指定生存时间,不用刷新,譬如说ttl为1天,那在一天后肯定过期,否则就可能出现state的ttl一直刷新,永远不过期,这样最终导致磁盘打满,看有解决方案使用定时任务自己删除,但是这样会严重损耗性能。请问大家还有其他方案吗


Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 Thread Yun Tang
hi  嘉诚

这个异常是因为你的task 被cancel了,所以清理了相关目录,结果导致硬链时候原始目录下文件找不到了。
换言之,这个task ”累积数据 -> Sink: 写入到HBase (12/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)” 
是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job 
manager日志中找到第一个fail的task,那上面的异常才是root cause。

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 15:17
To: user-zh 
Subject: Re: 回复: flink 缓存本地文件被删除疑问

Hi
   这是早上发生异常后,我下载的日志,请麻烦查看一下。
 taskmanager.log
<https://drive.google.com/file/d/17nP8yxSpdAnDDgBEbEUrDXYx-rwosi52/view?usp=drive_web>

Yun Tang  于2019年10月11日周五 下午2:56写道:

> Hi 戴嘉诚
>
> 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
> [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?
>
>
> [1]
> https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Friday, October 11, 2019 14:54
> To: user-zh@flink.apache.org 
> Subject: 回复: flink 缓存本地文件被删除疑问
>
> Hi,
> 我在代码中已经是显式的给每个算子都设置了uid了
>
> 发件人: Qi Kang
> 发送时间: 2019年10月11日 14:48
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink 缓存本地文件被删除疑问
>
> Hi,
>
> 从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID ->
> 状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids
>
>
> > On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> >
> > 大家好:
> >最近我的程序迁移到了flink1.9 on yarn
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6)
> from any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> > ... 6 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > ... 8 more
> > Caused by: java.nio.file.NoSuchFileException:
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> > at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> > at
> sun.nio

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 Thread Yun Tang
Hi 戴嘉诚

你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
 [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?


[1] 
https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 14:54
To: user-zh@flink.apache.org 
Subject: 回复: flink 缓存本地文件被删除疑问

Hi,
我在代码中已经是显式的给每个算子都设置了uid了

发件人: Qi Kang
发送时间: 2019年10月11日 14:48
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

Hi,

从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID -> 
状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids


> On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
>
> 大家好:
>最近我的程序迁移到了flink1.9 on yarn 
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.nio.file.NoSuchFileException: 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
>  -> 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> at java.nio.file.Files.createLink(Files.java:1086)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> 

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Hi Hao

It seems that I misunderstood the background of usage for your cases. High 
availability configuration targets for fault tolerance not for general 
development evolution. If you want to change your job topology, just follow the 
general rule to restore from savepoint/checkpoint, do not rely on HA to do job 
migration things.

Best
Yun Tang

From: Hao Sun 
Sent: Friday, October 11, 2019 8:33
To: Yun Tang 
Cc: Vijay Bhaskar ; Yang Wang 
; Sean Hester ; Aleksandar 
Mastilovic ; Yuval Itzchakov ; 
user 
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Yep I know that option. That's where get me confused as well. In a HA setup, 
where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember. And 
HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Just a minor supplement @Hao Sun<mailto:ha...@zendesk.com>, if you decided to 
drop a operator, don't forget to add --allowNonRestoredState (short: -n) option 
[1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>>
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: Sean Hester 
mailto:sean.hes...@bettercloud.com>>; Aleksandar 
Mastilovic mailto:amastilo...@sightmachine.com>>; 
Yun Tang mailto:myas...@live.com>>; Hao Sun 
mailto:ha...@zendesk.com>>; Yuval Itzchakov 
mailto:yuva...@gmail.com>>; user 
mailto:user@flink.apache.org>>
Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify 
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother 
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the 
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on 
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted 
again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a 
long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>> 
于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
 a) In that case, at least one job manager out of HA group should be up and 
running right? or
 b) All the job managers fails, then also this works? In that case please 
let me know the procedure/share the documentation?
 How to start from previous check point?
 What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. 
Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your 
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always 
recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester mailto:sean.hes...@bettercloud.com>> 
于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to 
the point all job managers fail/restart at the same time. That's where my 
original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per 
cluster--as long as they are all deployed to separate GKE nodes--would provide 
a very high uptime/low failure rate, at least on paper. It's a promising enough 
option that we're going to run in HA for a month or two and monitor results 
before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should 
take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? 
Does HA still helps to run the cluster from l

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Any checkpoint could only completed if your job not failed. Since checkpoint 
barrier is injected with messages together, if the problematic message would 
cause your job to fail. You cannot complete any checkpoint after that 
problematic message processed. In other words, you could always resume your job 
from kafka offset before that problematic message.

Best
Yun Tang

From: Flavio Pompermaier 
Sent: Friday, October 11, 2019 5:50
To: Yun Tang 
Cc: Congxian Qiu ; theo.diefent...@scoop-software.de 
; user 
Subject: Re: Flink restoring a job from a checkpoint

Sorry for the dumb question but let's suppose to not use retained checkpoint 
and my job processed billions of messages from Kafka. Then a problematic 
message causes my job to fail..am I able to complete a savepoint to fic the job 
and restart from the problematic message (i.e. last "valid" kafka offset)?

Il Gio 10 Ott 2019, 20:01 Yun Tang mailto:myas...@live.com>> 
ha scritto:
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu mailto:qcx978132...@gmail.com>>
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 
scratch.

Best,
Congxian


theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>> 
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and configuration burdens and works nicely as long 
as you don't have any state in your pipeline.

You should however be certain that nobody in your team will add something with 
state later on and forgets to think about the missing state...

Best regards
Theo




 Ursprüngliche Nachricht 
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my 
consumer group does not change ? I start from the group offsets : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")
So when I restart the job it should consume from the last committed offset to 
kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successf

Re: Where are uploaded Job jars stored?

2019-10-10 Thread Yun Tang
Hi John

The jar is not stored in HA path, I think the answer [1] could help you.

[1] 
https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl

Best
Yun Tang

From: John Smith 
Sent: Friday, October 11, 2019 2:06
To: user 
Subject: Where are uploaded Job jars stored?

Hi using 1.8.0 running on standalone cluster with Zookeeper HA.

Are job JARs stored at: high-availability.storageDir ???

The thing is when you browse the individual nodes at port 8080 to go submit the 
job only the node where you uploaded the JAR has it.

- Go to any given node
- Upload a jar
- Browse another node
- Jar is not there.




Re: Best coding practises guide while programming using flink apis

2019-10-10 Thread Yun Tang
Hi Terry

Flink has a code-style and quality guide when contributes code[1], this might 
not be directly what you want but hope could help a bit.

As more and more big data system recommend high level and declarative API such 
as SQL and Table API [2], I think GOF design patterns might not play an 
important role.

[1] https://flink.apache.org/contributing/code-style-and-quality-preamble.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/concepts/programming-model.html#levels-of-abstraction

Best
Yun Tang



From: Deepak Sharma 
Sent: Monday, September 23, 2019 10:25
To: Terry Wang ; user 
Cc: dev 
Subject: Re: Best coding practises guide while programming using flink apis

Thanks Terry.
I would need some volunteers to speak about their use cases and the best 
practised they have been following around flink.

―DK

On Sun, 22 Sep 2019 at 5:36 PM, Terry Wang 
mailto:zjuwa...@gmail.com>> wrote:
Hi, Deepak~

I appreciate your idea and cc to dev mail too.

Best,
Terry Wang



在 2019年9月22日,下午2:12,Deepak Sharma 
mailto:deepakmc...@gmail.com>> 写道:

Hi All
I guess we need to put some examples in the documentation around best coding 
practises , concurrency , non blocking IO and design patterns while writing 
Apache Flink pipelines.
Is there any such guide available ?
E.g. when and how to use the GOF design patterns . Any code snippet can be put 
as well explaining it.

This guide can come from people already running beam in production and written 
it with all best practices in mind.
It will help in greater and wider adoption.

Just a thought.
Please let me know if anyone wants to contribute and i can lead this initiative 
by documenting in flink wiki.

Thanks
--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com/>
www.keosha.net<http://www.keosha.net/>

--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


Re: Async and checkpointing

2019-10-10 Thread Yun Tang
Hi  Anurag

What do you mean "will the checkpoint pointer move at all or not"?

If one of your thread failed to send record, and if it would cause that 
sub-task to fail, it would lead to the job failover. When job failover, any 
on-going checkpoint would be aborted and job would then just restore from last 
latest checkpoint.
If failing to send record would not cause that sub-task to fail, nothing would 
happen and job continues to run but this might be not what you want.

Best
Yun Tang

From: anurag 
Sent: Friday, October 11, 2019 2:03
To: user@flink.apache.org 
Subject: Async and checkpointing

Hi All,
Thanks for your help in advance. I am using async I/O with  flink 1.5.5, I am 
using

AsyncDataStream.unorderedWait method  also my capacity is set to 100.My 
question is since my capacity is 100, each thread will be processing one 
record.Say sequence number of my records is S1,S2S100 and say at a 
particular point in time , thread T1 is processing record S1, T2 is processing 
record S2  and T100 is processing record S100 and say all the threads T1..T100 
except T50 failed when sending the record to the Sink. In this will the 
checkpoint pointer move at all or not.


Apologies in advance if my question is not clear.

Thanks,
Anurag


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Yun Tang
Just a minor supplement @Hao Sun<mailto:ha...@zendesk.com>, if you decided to 
drop a operator, don't forget to add --allowNonRestoredState (short: -n) option 
[1]


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Vijay Bhaskar 
Sent: Thursday, October 10, 2019 19:24
To: Yang Wang 
Cc: Sean Hester ; Aleksandar Mastilovic 
; Yun Tang ; Hao Sun 
; Yuval Itzchakov ; user 

Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Thanks Yang. We will try and let you know if any issues arise

Regards
Bhaskar

On Thu, Oct 10, 2019 at 1:53 PM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
@ Hao Sun,
I have made a confirmation that even we change parallelism and/or modify 
operators, add new operators,
the flink cluster could also recover from latest checkpoint.

@ Vijay
a) Some individual jobmanager/taskmanager crashed exceptionally(someother 
jobmanagers
and taskmanagers are alive), it could recover from the latest checkpoint.
b) All jobmanagers and taskmanagers fails, it could still recover from the 
latest checkpoint if the cluster-id
is not changed.

When we enable the HA, The meta of jobgraph and checkpoint is saved on 
zookeeper and the real files are save
on high-availability storage(HDFS). So when the flink application is submitted 
again with same cluster-id, it could
recover jobs and checkpoint from zookeeper. I think it has been supported for a 
long time. Maybe you could have a
try with flink-1.8 or 1.9.

Best,
Yang


Vijay Bhaskar mailto:bhaskar.eba...@gmail.com>> 
于2019年10月10日周四 下午2:26写道:
Thanks Yang and Sean. I have couple of questions:

1) Suppose the scenario of , bringing back entire cluster,
 a) In that case, at least one job manager out of HA group should be up and 
running right? or
 b) All the job managers fails, then also this works? In that case please 
let me know the procedure/share the documentation?
 How to start from previous check point?
 What Flink version onwards this feature is stable?

Regards
Bhaskar


On Wed, Oct 9, 2019 at 8:51 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi Vijay,

If you are using HA solution, i think you do not need to specify the savepoint. 
Instead the checkpoint is used.
The checkpoint is done automatically and periodically based on your 
configuration.When the
jobmanager/taskmanager fails or the whole cluster crashes, it could always 
recover from the latest
checkpoint. Does this meed your requirement?

Best,
Yang

Sean Hester mailto:sean.hes...@bettercloud.com>> 
于2019年10月1日周二 上午1:47写道:
Vijay,

That is my understanding as well: the HA solution only solves the problem up to 
the point all job managers fail/restart at the same time. That's where my 
original concern was.

But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers per 
cluster--as long as they are all deployed to separate GKE nodes--would provide 
a very high uptime/low failure rate, at least on paper. It's a promising enough 
option that we're going to run in HA for a month or two and monitor results 
before we put in any extra work to customize the savepoint start-up behavior.

On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
I don't think HA will help to recover from cluster crash, for that we should 
take periodic savepoint right? Please correct me in case i am wrong

Regards
Bhaskar

On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
mailto:bhaskar.eba...@gmail.com>> wrote:
Suppose my cluster got crashed and need to bring up the entire cluster back? 
Does HA still helps to run the cluster from latest save point?

Regards
Bhaskar

On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:
thanks to everyone for all the replies.

i think the original concern here with "just" relying on the HA option is that 
there are some disaster recovery and data center migration use cases where the 
continuity of the job managers is difficult to preserve. but those are 
admittedly very edgy use cases. i think it's definitely worth reviewing the 
SLAs with our site reliability engineers to see how likely it would be to 
completely lose all job managers under an HA configuration. that small a risk 
might be acceptable/preferable to a one-off solution.

@Aleksander, would love to learn more about Zookeeper-less HA. i think i 
spotted a thread somewhere between Till and someone (perhaps you) about that. 
feel free to DM me.

thanks again to everyone!

On Thu, Sep 26, 2019 at 7:32 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Hi, Aleksandar

Savepoint option in standalone job cluster is optional. If you want to always 
recover
from the latest checkpoint, just as Aleksandar and Yun Tang said you could use 
the
high-availability configuration. Make sure the cluster-id is not changed, i 
thi

Re: Flink restoring a job from a checkpoint

2019-10-10 Thread Yun Tang
Hi Vishwas

Image this scenario, if your last committed offset is A with a savepoint-A and 
then you just stop this job and try a new program logical such as print your 
output instead of writing to previous sink to do some experiments. The new 
experimental job might commit offset-B to kafka. Once verified, and then you 
still need to resume from kafka offset-A to ensure all data has been written to 
target sink. This would be easier If you just restore the job from savepoint-A.

In other words, Flink has already provided a more strong and flexible mechanism 
to resume kafka offsets, why not use this?

Best
Yun Tang

From: Congxian Qiu 
Sent: Thursday, October 10, 2019 11:52
To: theo.diefent...@scoop-software.de 
Cc: user 
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Sorry for the confusing, what Theo said previous is the meaning I want to say.  
Previously, what I said is from Flink's side, if we do not restore from 
checkpoint/savepoint, all the TMs will have no state, so the Job starts from 
scratch.

Best,
Congxian


theo.diefent...@scoop-software.de<mailto:theo.diefent...@scoop-software.de> 
mailto:theo.diefent...@scoop-software.de>> 
于2019年10月10日周四 上午1:15写道:
Hi Vishaws,

With "from scratch", Congxian means that Flink won't load any state 
automatically and starts as if there was no state. Of course if the kafka 
consumer group already exists and you have configured Flink to start from group 
offsets if there is no state yet, it will start from the group offsets.

I think your approach is totally fine. Ignoring savepoints and don't retaining 
checkpoints saves overhead and configuration burdens and works nicely as long 
as you don't have any state in your pipeline.

You should however be certain that nobody in your team will add something with 
state later on and forgets to think about the missing state...

Best regards
Theo




 Ursprüngliche Nachricht 
Betreff: Re: Flink restoring a job from a checkpoint
Von: Vishwas Siravara
An: Congxian Qiu
Cc: Yun Tang ,user

Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my 
consumer group does not change ? I start from the group offsets : 
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")
So when I restart the job it should consume from the last committed offset to 
kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ 
-c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp 
qa_streaming
2. /usr/mware/flink/bin/flink run -s 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
flink-job-assembly.jar flink druid -p 4 -cp qa_streaming

Thanks,
Vishwas

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is no

Re: Re:Memory constrains running Flink on Kubernetes

2019-10-10 Thread Yun Tang
Hi Shengjk1

setBlockCacheSize, setWriteBufferSize and setMaxWriteBufferNumber could help 
you to control memory usage. However, Flink would store state per column family 
which would increase the number of column family and each family has its own 
write buffer. FRocksDB [1] already plan to fix this by introducing RocksDB's 
feature of write buffer manager. We would try to fix FLINK-7289 before 
Flink-1.10 release.

If you are really urgent to fix this problem, I have a non-official built 
frocksDB based on rocksDB-5.18.3 which had been verified work well from Gyula 
Fora's experience. You could contact me in private to get this jar package and 
rebuild your Flink runtime to enable write buffer manager future.


[1] https://github.com/dataArtisans/frocksdb/pull/4
[2] 
https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#limit-total-memory-of-memtables

Best
Yun Tang


From: shengjk1 
Sent: Thursday, October 10, 2019 20:37
To: wvl 
Cc: user@flink.apache.org 
Subject: Re:Memory constrains running Flink on Kubernetes

+1

I also encountered a similar problem, but I run flink application that uses 
state in RocksDB on yarn. Yarn container was killed because OOM.
I also saw rockdb tuning guide[1], tune some parameters,but it is useless , 
such as:

class MyOptions1 implements OptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions) {
return currentOptions.setDbWriteBufferSize(64 * 1024 * 1024)
.setIncreaseParallelism(2)
.setMaxBackgroundFlushes(2)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setMaxOpenFiles(4)
.setUseFsync(false);
}
@Override
public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(16 * 1024 * 1024)
//increases read amplification but decreases memory useage and space 
amplification
.setBlockSize(16 * 1024 * 1024))
.setWriteBufferSize(16 * 1024 * 1024)
.setMaxWriteBufferNumber(1);
}
}

Additional, this is FLINK-7289, it is similar to us. But I don’t find a good 
way to  fix it.


[1] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
[2] https://issues.apache.org/jira/browse/FLINK-7289



Best,
Shengjk1


On 07/24/2019 03:48,wvl<mailto:lee...@gmail.com> wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state 
in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were 
often running into memory issues made apparent by Kubernetes OOMKilled and Java 
OOM log events.

In order to tackle these, we're trying to account for all the memory used in 
the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, 
disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to 
RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where 
"You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm 
assuming corresponds to a "Column Family" in RockDB. Meaning our budget should 
be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also 
enabled various rocksdb metrics, but it's unclear where this Write Buffer 
memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, 
NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are 
killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded.

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G 
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap 
-XX:MaxRAMFraction=2
With flink config:
  taskmanager.heap.size: 5000m
  state.backend: rocksdb
  state.backend.incremental: true
  state.backend.rocksdb.timer-service.factory: ROCKSDB

Based on what we've observed we're thinking about setting -XX:MaxMetaspaceSize 
to a reasonable value, so that we at least get an error message which can 
easily be traced back to the behavior we're seeing.

Okay, all that said let's sum up what we're asking here:
- Is there any more insight into how memory is accounted for than our current 
metrics?
- Which metric, if any accounts for RocksDB memory usage?
- What's going on with the Metaspace growth we're seeing during job restarts, 
is there something we can do about this such as s

Re: Difference between windows in Spark and Flink

2019-10-10 Thread Yun Tang
Hi Felipe

Generally speaking, the key difference which impacts the performance is where 
they store data within windows.
For Flink, it would store data and its related time-stamp within windows in 
state backend[1]. Once window is triggered, it would pull all the stored timer 
with coupled record-key, and then use the record-key to query state backend for 
next actions.

For Spark, first of all, we would talk about structured streaming [2] as it's 
better than previous spark streaming especially on window scenario. Unlike 
Flink built-in supported rocksDB state backend, Spark has only one 
implementation of state store providers. It's HDFSBackedStateStoreProvider 
which stores all of the data in memory, what is a very memory consuming 
approach and might come across OOM errors[3][4][5].

To avoid this, Databricks Runtime offer a 'RocksDBStateStoreProvider' but not 
open-source. We're lucky that open-source Flink already offers built-in RocksDB 
state backend to avoid OOM problem. Moreover, Flink community recently are 
developing spill-able memory state backend [7].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html
[2] 
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
[3] 
https://medium.com/@chandanbaranwal/state-management-in-spark-structured-streaming-aaa87b6c9d31
[4] 
http://apache-spark-user-list.1001560.n3.nabble.com/use-rocksdb-for-spark-structured-streaming-SSS-td34776.html#a34779
[5] https://github.com/chermenin/spark-states
[6] 
https://docs.databricks.com/spark/latest/structured-streaming/production.html#optimize-performance-of-stateful-streaming-queries
[7] https://issues.apache.org/jira/browse/FLINK-12692

Best
Yun Tang




From: Felipe Gutierrez 
Sent: Thursday, October 10, 2019 20:39
To: user 
Subject: Difference between windows in Spark and Flink

Hi all,

I am trying to think about the essential differences between operators in Flink 
and Spark. Especially when I am using Keyed Windows then a reduce operation.
In Flink we develop an application that can logically separate these two 
operators. Thus after a keyed window I can use .reduce/aggregate/fold/apply() 
functions [1].
In Spark we have window/reduceByKeyAndWindow functions which to me appears it 
is less flexible in the options to use with a keyed window operation [2].
Moreover, when these two applications are deployed in a Flink and Spark cluster 
respectively, what are the differences between their physical operators running 
in the cluster?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#windows
[2] 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
Hi Vishwas

This because Flink's checkpoint mechanism could offer you more ability. You 
could resume from offset within specific checkpoint instead of last committed 
offset not to mention you could benefit from restoring from last timer state, 
operator state and keyed state.

Best
Yun Tang



From: Congxian Qiu 
Sent: Wednesday, October 9, 2019 10:06:12 AM
To: Vishwas Siravara 
Cc: Yun Tang ; user 
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint---> bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ 
-c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp 
qa_streaming
2. /usr/mware/flink/bin/flink run -s 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
flink-job-assembly.jar flink druid -p 4 -cp qa_streaming

Thanks,
Vishwas

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is not really recommended, better to resume from last checkpoint 
[3]

[1] 
https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
[2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint


Best
Yun Tang



From: Vishwas Siravara mailto:vsirav...@gmail.com>>
Sent: Wednesday, October 9, 2019 0:54
To: user mailto:user@flink.apache.org>>
Subject: Flink restoring a job from a checkpoint

Hi guys,
I have a flink streaming job which streams from a kafka source. There is no 
state in the job, just a simple filter , map and write to a kafka sink. Suppose 
I stop my job and then submit the job again to the cluster with the same 
consumer group, will the job restore automatically from the last successful 
checkpoint , since this is what is the last committed offset to kafka ?

Thanks,
Vishwas


Re: Flink restoring a job from a checkpoint

2019-10-08 Thread Yun Tang
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is not really recommended, better to resume from last checkpoint 
[3]

[1] 
https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
[2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint


Best
Yun Tang



From: Vishwas Siravara 
Sent: Wednesday, October 9, 2019 0:54
To: user 
Subject: Flink restoring a job from a checkpoint

Hi guys,
I have a flink streaming job which streams from a kafka source. There is no 
state in the job, just a simple filter , map and write to a kafka sink. Suppose 
I stop my job and then submit the job again to the cluster with the same 
consumer group, will the job restore automatically from the last successful 
checkpoint , since this is what is the last committed offset to kafka ?

Thanks,
Vishwas


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-25 Thread Yun Tang
As Aleksandar said, k8s with HA configuration could solve your problem. There 
already have some discussion about how to implement such HA in k8s if we don't 
have a zookeeper service: FLINK-11105 [1] and FLINK-12884 [2]. Currently, you 
might only have to choose zookeeper as high-availability service.

[1] https://issues.apache.org/jira/browse/FLINK-11105
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang

From: Aleksandar Mastilovic 
Sent: Thursday, September 26, 2019 1:57
To: Sean Hester 
Cc: Hao Sun ; Yuval Itzchakov ; user 

Subject: Re: Challenges Deploying Flink With Savepoints On Kubernetes

Can’t you simply use JobManager in HA mode? It would pick up where it left off 
if you don’t provide a Savepoint.

On Sep 25, 2019, at 6:07 AM, Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:

thanks for all replies! i'll definitely take a look at the Flink k8s Operator 
project.

i'll try to restate the issue to clarify. this issue is specific to starting a 
job from a savepoint in job-cluster mode. in these cases the Job Manager 
container is configured to run a single Flink job at start-up. the savepoint 
needs to be provided as an argument to the entrypoint. the Flink documentation 
for this approach is here:

https://github.com/apache/flink/tree/master/flink-container/kubernetes#resuming-from-a-savepoint

the issue is that taking this approach means that the job will always start 
from the savepoint provided as the start argument in the Kubernetes YAML. this 
includes unplanned restarts of the job manager, but we'd really prefer any 
unplanned restarts resume for the most recent checkpoint instead of restarting 
from the configured savepoint. so in a sense we want the savepoint argument to 
be transient, only being used during the initial deployment, but this runs 
counter to the design of Kubernetes which always wants to restore a deployment 
to the "goal state" as defined in the YAML.

i hope this helps. if you want more details please let me know, and thanks 
again for your time.


On Tue, Sep 24, 2019 at 1:09 PM Hao Sun 
mailto:ha...@zendesk.com>> wrote:
I think I overlooked it. Good point. I am using Redis to save the path to my 
savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov 
mailto:yuva...@gmail.com>> wrote:
Hi Hao,

I think he's exactly talking about the usecase where the JM/TM restart and they 
come back up from the latest savepoint which might be stale by that time.

On Tue, 24 Sep 2019, 19:24 Hao Sun, 
mailto:ha...@zendesk.com>> wrote:
We always make a savepoint before we shutdown the job-cluster. So the savepoint 
is always the latest. When we fix a bug or change the job graph, it can resume 
well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM, uncaught 
exception, etc.

Maybe I do not understand your use case well, I do not see a need to start from 
checkpoint after a bug fix.
>From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
mailto:yuva...@gmail.com>> wrote:
AFAIK there's currently nothing implemented to solve this problem, but working 
on a possible fix can be implemented on top of 
https://github.com/lyft/flinkk8soperator which already has a pretty fancy state 
machine for rolling upgrades. I'd love to be involved as this is an issue I've 
been thinking about as well.

Yuval

On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
mailto:sean.hes...@bettercloud.com>> wrote:
hi all--we've run into a gap (knowledge? design? tbd?) for our use cases when 
deploying Flink jobs to start from savepoints using the job-cluster mode in 
Kubernetes.

we're running a ~15 different jobs, all in job-cluster mode, using a mix of 
Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are all 
long-running streaming jobs, all essentially acting as microservices. we're 
using Helm charts to configure all of our deployments.

we have a number of use cases where we want to restart jobs from a savepoint to 
replay recent events, i.e. when we've enhanced the job logic or fixed a bug. 
but after the deployment we want to have the job resume it's "long-running" 
behavior, where any unplanned restarts resume from the latest checkpoint.

the issue we run into is that any obvious/standard/idiomatic Kubernetes 
deployment includes the savepoint argument in the configuration. if the Job 
Manager container(s) have an unplanned restart, when they come back up they 
will start from the savepoint instead of resuming from the latest checkpoint. 
everything is working as configured, but that's not exactly what we want. we 
want the savepoint argument to be transient somehow (only used during the 
initial deployment), but Kubernetes doesn't really support the concept of 
transient configuration.

i can see a couple of poten

Re: 回复: flink使用StateBackend问题

2019-09-03 Thread Yun Tang
Hi


  1.  Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure
  2.  所有的subtask都是n/a 么,source 
task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source 
task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element)
  3.  作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back 
pressure)情况?如果作业反压的话,barrier一直都流不到下游,容易造成checkpoint超时。

建议分享一下作业webUI上的checkpoint 信息。

祝好
唐云

From: Wesley Peng 
Sent: Tuesday, September 3, 2019 15:44
To: user-zh@flink.apache.org 
Subject: Re: 回复: flink使用StateBackend问题



on 2019/9/3 15:38, 守护 wrote:
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late 
> message for now expired checkpoint attempt 3 from 
> 24674987621178ed1a363901acc5b128 of job fd5010cbf20501339f1136600f0709c3.
> 请问这个是什么问题呢?

可以根据这些失败的task的id去查询这些任务落在哪一个taskmanager上,经过排查发现,是同一台机器,通过ui看到该机器流入的数据明显比别的流入量大
因此是因为数据倾斜导致了这个问题,追根溯源还是下游消费能力不足的问题

also reference:
https://juejin.im/post/5c374fe3e51d451bd1663756


Re: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-01 Thread Yun Tang
Hi

向0.0.0.0:8030 尝试提交作业是因为提交作业时找不到正确的YARN配置,就会向默认的本地8030端口提交,检查一下HADOOP_CONF_DIR 
或者 HADOOP_HOME 这些环境变量有没有设置正确。可以设置一下这些配置文件的目录地址就可以提交作业了。

BTW,这个不是一个Flink的问题,是所有使用YARN管理作业的大数据计算引擎都有可能遇到的问题。

祝好
唐云

From: 周��岗 
Sent: Sunday, September 1, 2019 15:31
To: user-zh@flink.apache.org 
Subject: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错


比较肯定yarn的配置基本是正确的,不知道为何flink始终在通过0.0.0.0 连接yarn scheduler







在 2019-09-01 13:55:50,"周��岗"  写道:
>Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); 
>retry policy is RetryUpToMaximumCountWithFixedSleep
>
>
>
>
>同样的一台电脑使用1.7.2部署就没有问题,有没有大神帮忙看看哪里有问题


Re: Non incremental window function accumulates unbounded state with RocksDb

2019-09-01 Thread Yun Tang
Hi William

I think there might be another possible cause. Since RocksDB would perform 10X 
less than heap state backend. Have you ever checked current watermark of the 
job (from web UI) to see whether window triggered as expected, and whether the 
rocksDB job behaves back pressured? If state have been stayed in the window but 
not triggered, we might meet larger state. (However, it seems still cannot be 
acted a 400 factor larger)

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 18:22
To: Yun Tang ; user@flink.apache.org 
Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb


Thanks for your answer Yun.



I agree, I don’t believe that either, however that’s my empirical observation. 
Those statistics are from save points. Basically the jobs are running towards a 
production kafka so no, not exactly the same input. However, these statistics 
are from several runs distributed in time so they should not contain temporal 
effects. There are no failovers in the pipeline during runtime. By doing some 
calculations on the size and the pace of the data in the pipeline (how often we 
receive data and how big the datatype is) yields that the buffered data in the 
windows should be around a little less than 200Mb, so the HeapBackend behaves 
accordingly. I agree, the space amplification can’t be a factor of 400 and 
still continue growing for the RocksDb. I’ve spent some time trying to figure 
this out, if we are doing anything obscure , but I cant find anything. So it 
would be interesting if anyone have the same experience as I have.



The pipeline is currently running on Flink 1.7.2



Best regards and wish you a pleasant day,

William



From: Yun Tang 
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson , "user@flink.apache.org" 

Cc: Fleet Perception for Maintenance 

Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb



Hi William



I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?



Best

Yun Tang



From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb



Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 

Re: Non incremental window function accumulates unbounded state with RocksDb

2019-08-30 Thread Yun Tang
Hi William

I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb


Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogram]) : Unit = {

out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(40)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(45)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(90)

DataStream stream = env

.addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new 
Histogram()).name(“Pseudocode").uuid(“Psuedocode”)



William Jonsson
Systems Engineer
Fleet Perception for Maintenance
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden  Mobile: +46 722 178 247
william.jons...@niradynamics.se
www.niradynamics.se
Together for smarter safety


Re: best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Yun Tang
Hi  Yu

If you have client job log and you could find your application id from below 
description:

The Flink YARN client has been started in detached mode. In order to stop Flink 
on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill {appId}
Please also note that the temporary files of the YARN session in the home 
directory will not be removed.

Best
Yun Tang


From: Zhu Zhu 
Sent: Friday, August 30, 2019 16:24
To: Yu Yang 
Cc: user 
Subject: Re: best practices on getting flink job logs from Hadoop history 
server?

Hi Yu,

Regarding #2,
Currently we search task deployment log in JM log, which contains info of the 
container and machine the task deploys to.

Regarding #3,
You can find the application logs aggregated by machines on DFS, this path of 
which relies on your YARN config.
Each log may still include multiple TM logs. However it can be much smaller 
than the "yarn logs ..." generated log.

Thanks,
Zhu Zhu

Yu Yang mailto:yuyan...@gmail.com>> 于2019年8月30日周五 下午3:58写道:
Hi,

We run flink jobs through yarn on hadoop clusters. One challenge that we are 
facing is to simplify flink job log access.

The flink job logs can be accessible using "yarn logs $application_id". That 
approach has a few limitations:

  1.  It is not straightforward to find yarn application id based on flink job 
id.
  2.  It is difficult to find the corresponding container id for the flink sub 
tasks.
  3.  For jobs that have many tasks, it is inefficient to use "yarn logs ..."  
as it mixes logs from all task managers.

Any suggestions on the best practice to get logs for completed flink job that 
run on yarn?

Regards,
-Yu




Re: checkpoint failure suddenly even state size less than 1 mb

2019-08-30 Thread Yun Tang
Hi Sushant

What confuse me is that why source task cannot complete checkpoint in 3 minutes 
[1]. If no sub-task has ever completed the checkpoint, which means even source 
task cannot complete. Actually source task would not need to buffer the data. 
From what I see, it might be affected by acquiring the lock which hold by 
stream task main thread to process elements [2]. Could you use jstack to 
capture your java process' threads to know what happened when checkpoint failed?

[1] 
https://github.com/sushantbprise/flink-dashboard/blob/master/failed-checkpointing/state2.png
[2] 
https://github.com/apache/flink/blob/ccc7eb431477059b32fb924104c17af953620c74/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L758

Best
Yun Tang

From: Sushant Sawant 
Sent: Tuesday, August 27, 2019 15:01
To: user 
Subject: Re: checkpoint failure suddenly even state size less than 1 mb

Hi team,
Anyone for help/suggestion, now we have stopped all input in kafka, there is no 
processing, no sink but checkpointing is failing.
Is it like once checkpoint fails it keeps failing forever until job restart.

Help appreciated.

Thanks & Regards,
Sushant Sawant

On 23 Aug 2019 12:56 p.m., "Sushant Sawant" 
mailto:sushantsawant7...@gmail.com>> wrote:
Hi all,
m facing two issues which I believe are co-related though.
1. Kafka source shows high back pressure.
2. Sudden checkpoint failure for entire day until restart.

My job does following thing,
a. Read from Kafka
b. Asyncio to external system
c. Dumping in Cassandra, Elasticsearch

Checkpointing is using file system.
This flink job is proven under high load,
around 5000/sec throughput.
But recently we scaled down parallelism since, there wasn't any load in 
production and these issues started.

Please find the status shown by flink dashboard.
The github folder contains image where there was high back pressure and 
checkpoint failure
https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
and  after restart, "everything is fine" images in this folder,
https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing

--
Could anyone point me towards direction what would have went wrong/ trouble 
shooting??


Thanks & Regards,
Sushant Sawant



Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 Thread Yun Tang
Hi 蒋涛涛

有一种比较hack的方式可以实现,代码里面source是需要根据uid来找到相关的state进行offset恢复,如果你不想通过checkpoint恢复source的state,可以在代码里面手动把source的uid给改掉,同时在从checkpoint恢复时带上
 --allowNonRestoredState 参数,这样kafka 
source从恢复的checkpoint/savepoint里面找不到相关的source state,就会从你设置的offset进行恢复了。

祝好
唐云

From: 蒋涛涛 
Sent: Thursday, August 29, 2019 11:45
To: user-zh@flink.apache.org 
Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

Hi  Yun Tang,
其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed
state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置
“auto.commit.enable” 为false,这个时候就不提交kafka
offset,如果正常暂停任务的时候,从checkpoint恢复的时候,我就不知道从哪个时间点进行消费了。


Yun Tang  于2019年8月29日周四 上午10:57写道:

> Hi 蒋涛涛
>
> Flink的kafka consumer一共有三种offset commit模式:
>
>   1.  OffsetCommitMode.DISABLED   完全disable offset的commit
>   2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink
> checkpoint完成时,才会将offset commit到Kafka
>   3.  OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal
> client的默认行为,周期性将offset commit到kafka
>
> 如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置
> setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置
> “auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka
> consume from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed
> state相关吧),也可以从最新的offset消费。
>
> 祝好
> 唐云
> 
> From: wang jinhai 
> Sent: Thursday, August 29, 2019 10:25
> To: user-zh@flink.apache.org 
> Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论
>
> 可以选择从之前的某个checkpoint恢复吧
>
>
> 在 2019/8/29 上午10:01,“蒋涛涛” 写入:
>
> Hi everyone:
>
>
> 如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。
>
> Regards,
> JackJiang
>
>


Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 Thread Yun Tang
Hi 蒋涛涛

Flink的kafka consumer一共有三种offset commit模式:

  1.  OffsetCommitMode.DISABLED   完全disable offset的commit
  2.  OffsetCommitMode.ON_CHECKPOINTS   Flink的默认行为,只有当Flink 
checkpoint完成时,才会将offset commit到Kafka
  3.  OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal 
client的默认行为,周期性将offset commit到kafka

如果不想借助checkpoint来重置kafka的offset,可以利用FlinkKafkaConsumerBase 设置 
setCommitOffsetsOnCheckpoints(false),以及在kakfa properties里面设置 
“auto.commit.enable” 为false,这样就相当于没有commit offset,作业恢复的时候,如果你们设置是从kafka consume 
from latest,既可以恢复checkpoint中的state(你们应该是想要主要恢复keyed state相关吧),也可以从最新的offset消费。

祝好
唐云

From: wang jinhai 
Sent: Thursday, August 29, 2019 10:25
To: user-zh@flink.apache.org 
Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

可以选择从之前的某个checkpoint恢复吧


在 2019/8/29 上午10:01,“蒋涛涛” 写入:

Hi everyone:


如题,我遇到有些数据我不应该漏了想回溯部分数据,这个时候我就需要清理state,来重置kafka的offset重新跑,可不可以保留flink任务state,从checkpoint恢复任务的时候重置kafka的offset,并从kafka那个时间段开始消费,而不需要清掉state重新跑数据。

Regards,
JackJiang



Re: StateMigrationException thrown by state processor api

2019-08-27 Thread Yun Tang
Hi Paul

Would you please share more information of the exception stack trace and the 
state descriptor of this map state with that window operator?

For all user-facing keyed state, the namespace serializer would always be 
VoidNamespaceSerializer. And only window state could have different name space 
serializer.

Best
Yun Tang

From: Paul Lam 
Sent: Tuesday, August 27, 2019 17:14
To: user 
Cc: Tzu-Li (Gordon) Tai 
Subject: StateMigrationException thrown by state processor api

Hi,

I was using the new state processor api to read a savepoint produced by Flink 
1.5.3, and got an StateMigrationException with message “For heap backends, the 
new namespace serializer must be compatible”.

Concretely, the state I was trying to read is a MapState within a 
WindowOperator(TriggerContext) which is keyed by a string field, so the key & 
value data types and the corresponding serializers should be the basic one here 
(POJOSerializer).

However, the error indicates the problem is about namespace serializer which I 
have not much knowledge about. So I dig a bit into the source code to find that 
the namespace serializer is not a highly configurable one, and both state 
processor api and the state to be read seem to use the same namespace 
serializer, `VoidNamespaceSerializer` (please correct me if I’m wrong). It 
still doesn’t explain why the error happened.

Please point me to the right direction. Thanks a lot!

Best,
Paul Lam



Re: 任务内存增长

2019-08-25 Thread Yun Tang
hi 张坤

使用的是RocksDBStateBackend么,一般被YARN的node manager内存超用而kill是native 内存超用导致的。可以在Flink 
参数env.java.opts.taskmanager里面加上 -XX:NativeMemoryTracking=detail 
[1],这样可以观察内存是否增长。另外你使用的内存配置和被kill时候的YARN的日志分别是什么呢,可以考虑增大JVM heap 
申请的资源来变相加大向YARN申请的总内存,某种程度上可以缓解被kill的概率。


[1] 
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html

祝好
唐云

From: 张坤 
Sent: Monday, August 26, 2019 10:45
To: user-zh@flink.apache.org 
Subject: 任务内存增长

Hi:

   最近在使用Flink(1.7.2)提交任务到yarn(per 
job),任务在yarn上运行几个小时就会被kill掉,观察到任务的内存一直在增长,任务提交时有内存参数设置,任务逻辑为kafka数据简单处理后,注册成table,使用窗口聚合,

大家有没有遇到类似的问题,原因是什么?怎么解决或者优化?谢谢!



Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Yun Tang
Glad to hear this and really appreciate Gordon and Kurt's drive on this 
release, and thanks for everyone who ever contributed to this release.

Best
Yun Tang

From: Becket Qin 
Sent: Friday, August 23, 2019 0:19
To: 不常用邮箱 
Cc: Yang Wang ; user 
Subject: Re: [ANNOUNCE] Apache Flink 1.9.0 released

Cheers!! Thanks Gordon and Kurt for driving the release!

On Thu, Aug 22, 2019 at 5:36 PM 不常用邮箱 
mailto:xu_soft39211...@163.com>> wrote:
Good news!

Best.
--
Louis
Email: xu_soft39211...@163.com<mailto:xu_soft39211...@163.com>

On Aug 22, 2019, at 22:10, Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:

Glad to hear that.
Thanks Gordon, Kurt and everyone who had made contributions to the great 
version.


Best,
Yang


Biao Liu mailto:mmyy1...@gmail.com>> 于2019年8月22日周四 下午9:33写道:
Great news!

Thank your Gordon & Kurt for being the release managers!
Thanks all contributors worked on this release!

Thanks,
Biao /'bɪ.aʊ/



On Thu, 22 Aug 2019 at 21:14, Paul Lam 
mailto:paullin3...@gmail.com>> wrote:
Well done! Thanks to everyone who contributed to the release!

Best,
Paul Lam

Yu Li mailto:car...@gmail.com>> 于2019年8月22日周四 下午9:03写道:
Thanks for the update Gordon, and congratulations!

Great thanks to all for making this release possible, especially to our release 
managers!

Best Regards,
Yu


On Thu, 22 Aug 2019 at 14:55, Xintong Song 
mailto:tonysong...@gmail.com>> wrote:
Congratulations!
Thanks Gordon and Kurt for being the release managers, and thanks all the 
contributors.

Thank you~
Xintong Song


On Thu, Aug 22, 2019 at 2:39 PM Yun Gao 
mailto:yungao...@aliyun.com>> wrote:
 Congratulations !

 Very thanks for Gordon and Kurt for managing the release and very thanks 
for everyone for the contributions !

  Best,
  Yun



--
From:Zhu Zhu mailto:reed...@gmail.com>>
Send Time:2019 Aug. 22 (Thu.) 20:18
To:Eliza mailto:e...@chinabuckets.com>>
Cc:user mailto:user@flink.apache.org>>
Subject:Re: [ANNOUNCE] Apache Flink 1.9.0 released

Thanks Gordon for the update.
Congratulations that we have Flink 1.9.0 released!
Thanks to all the contributors.

Thanks,
Zhu Zhu


Eliza mailto:e...@chinabuckets.com>> 于2019年8月22日周四 
下午8:10写道:


On 2019/8/22 星期四 下午 8:03, Tzu-Li (Gordon) Tai wrote:
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.9.0, which is the latest major release.

Congratulations and thanks~

regards.




Re: 回复: flink启动等待10分钟问题

2019-08-21 Thread Yun Tang
9,809 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- window: 
(TumblingGroupWindow('w$, 'rowtime, 6.millis)), select: (COUNT(*) AS pv, 
COUNT(DISTINCT curuserid) AS uv, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> select: (pv, uv, 
utc2local(w$end) AS EXPR$2) -> to: Row (1/1) (0071e1c7c4f70646914329f81dcbd349) 
switched from DEPLOYING to RUNNING.
2019-08-21 20:00:39,991 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Print to 
Std. Out (1/4) (0a206444b10553cce42fa34ea0b15497) switched from DEPLOYING to 
RUNNING.
2019-08-21 20:00:39,992 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Print to 
Std. Out (2/4) (a5d1bb759ee6c8001313fe39c7982a8b) switched from DEPLOYING to 
RUNNING.
2019-08-21 20:00:39,996 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: 
Print to Std. Out (1/4) (39d437bceb033f5bd37747e9142c7f0e) switched from 
DEPLOYING to RUNNING.
2019-08-21 20:00:39,997 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map -> Sink: 
Print to Std. Out (2/4) (aa4f64edc4f10da2e8bdbf6ebf0aeb83) switched from 
DEPLOYING to RUNNING.
2019-08-21 20:00:40,005 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Flat Map -> Filter -> Filter -> Map -> Timestamps/Watermarks -> from: 
(request, curuserid, timelong, rowtime) -> select: (rowtime, 0 AS $f1, 
curuserid) -> time attribute: (rowtime) (4/4) 
(78f520582607e26b365fca483fc98d4c) switched from DEPLOYING to RUNNING.
2019-08-21 20:00:40,006 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source -> Flat Map -> Filter -> Filter -> Map -> Timestamps/Watermarks -> from: 
(request, curuserid, timelong, rowtime) -> select: (rowtime, 0 AS $f1, 
curuserid) -> time attribute: (rowtime) (3/4) 
(4756a0450881325af5fb396a18dbb8ae) switched from DEPLOYING to RUNNING.
2019-08-21 20:00:41,605 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 1 @ 1566388841597 for job e5ea83238d721e932b4645589660406c.
2019-08-21 20:00:42,466 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 1 for job e5ea83238d721e932b4645589660406c (7994 bytes in 821 ms).



-- 原始邮件 --
发件人: "Yun Tang";
发送时间: 2019年8月22日(星期四) 中午11:08
收件人: "user-zh";

主题: Re: flink启动等待10分钟问题



Hi

Flink on YARN作业启动时间长,有很多原因,例如资源不够在等待,container申请的时候又退出了。默认的slot 
request的timeout时间是5min,感觉你的作业应该是可能遇到了一个slot request 
timeout,然后又重新申请。最好能提供一下jobmanager的日志才好进一步分析。

祝好
唐云

From: 々守护々 <346531...@qq.com>
Sent: Thursday, August 22, 2019 11:04
To: user-zh 
Subject: flink启动等待10分钟问题

各位Flink社区大佬,您好!
  我使用Flink SQL (Flink 1.8.1),在hadoop 集群中调整了一下yarn-site.xml文件的内存参数:

yarn.nodemanager.resource.memory-mb
16384


yarn.scheduler.minimum-allocation-mb
1024


yarn.scheduler.maximum-allocation-mb
8192




然后我去启动flink on yarn任务,启动参数:./flink-1.8.1/bin/flink run -m yarn-cluster -p 4 -d 
-c streaming.StreamKafkaMain -yn 4 -ys 2 -yjm 2048 -ytm 8192  -ynm bigdata6 
./flink_bigdata6.jar
最终能能启动成功,就是启动要等待10分钟,这个等待时间太长了,请问各位大佬有什么解决的方法吗?


Re: flink启动等待10分钟问题

2019-08-21 Thread Yun Tang
Hi

Flink on YARN作业启动时间长,有很多原因,例如资源不够在等待,container申请的时候又退出了。默认的slot 
request的timeout时间是5min,感觉你的作业应该是可能遇到了一个slot request 
timeout,然后又重新申请。最好能提供一下jobmanager的日志才好进一步分析。

祝好
唐云

From: 々守护々 <346531...@qq.com>
Sent: Thursday, August 22, 2019 11:04
To: user-zh 
Subject: flink启动等待10分钟问题

各位Flink社区大佬,您好!
  我使用Flink SQL (Flink 1.8.1),在hadoop 集群中调整了一下yarn-site.xml文件的内存参数:

yarn.nodemanager.resource.memory-mb
16384


yarn.scheduler.minimum-allocation-mb
1024


yarn.scheduler.maximum-allocation-mb
8192




然后我去启动flink on yarn任务,启动参数:./flink-1.8.1/bin/flink run -m yarn-cluster -p 4 -d 
-c streaming.StreamKafkaMain -yn 4 -ys 2 -yjm 2048 -ytm 8192  -ynm bigdata6 
./flink_bigdata6.jar
最终能能启动成功,就是启动要等待10分钟,这个等待时间太长了,请问各位大佬有什么解决的方法吗?


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Yun Tang
Congratulations Andrey.

Best
Yun Tang

From: Xintong Song 
Sent: Wednesday, August 14, 2019 21:40
To: Oytun Tez 
Cc: Zili Chen ; Till Rohrmann ; dev 
; user 
Subject: Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

Congratulations Andery~!


Thank you~

Xintong Song


On Wed, Aug 14, 2019 at 3:31 PM Oytun Tez 
mailto:oy...@motaword.com>> wrote:
Congratulations Andrey!

I am glad the Flink committer team is growing at such a pace!

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com<mailto:oy...@motaword.com> ― 
www.motaword.com<http://www.motaword.com/>


On Wed, Aug 14, 2019 at 9:29 AM Zili Chen 
mailto:wander4...@gmail.com>> wrote:
Congratulations Andrey!

Best,
tison.


Till Rohrmann mailto:trohrm...@apache.org>> 于2019年8月14日周三 
下午9:26写道:
Hi everyone,

I'm very happy to announce that Andrey Zagrebin accepted the offer of the Flink 
PMC to become a committer of the Flink project.

Andrey has been an active community member for more than 15 months. He has 
helped shaping numerous features such as State TTL, FRocksDB release, Shuffle 
service abstraction, FLIP-1, result partition management and various 
fixes/improvements. He's also frequently helping out on the user@f.a.o mailing 
lists.

Congratulations Andrey!

Best, Till
(on behalf of the Flink PMC)


Re: Flink 1.8: Using the RocksDB state backend causes "NoSuchMethodError" when trying to stop a pipeline

2019-08-13 Thread Yun Tang
Hi Tobias

First of all, I think you would not need to ADD the flink-statebackend-rocksdb 
jar package into your docker image's lib folder, as the flink-dist jar package 
within lib folder already include all classes of flink-statebackend-rocksdb.

I think the root cause is that you might assemble the rocksdbjni jar package in 
your user application jar which was rocksdbjni-5.7.5.jar in Flink-1.7. As Flink 
would load classes first from the user code jar [1], however, method 
org.rocksdb.ColumnFamilyHandle.getDescriptor() is not existed in 
rocksdbjni-5.7.5.jar but in rocksdbjni-5.17.2 (or we can say 
frocksdbjni-5.17.2-artisans-1.0 in Flink-1.8). That's why you come across this 
NoSuchMethodError exception.

If no necessary, please do not assemble rocksdbjni package in your user code 
jar as flink-dist already provide all needed classes. Moreover, adding 
dependency of flink-statebackend-rocksdb_2.11 in your pom.xml should be enough 
as it already includes the dependency of rocksdbjni.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#classloader-resolve-order

Best
Yun Tang


From: Kaymak, Tobias 
Sent: Tuesday, August 13, 2019 21:20
To: user@flink.apache.org 
Subject: Flink 1.8: Using the RocksDB state backend causes "NoSuchMethodError" 
when trying to stop a pipeline

Hi,

I am using Apache Beam 2.14.0 with Flink 1.8.0 and I have included the RocksDb 
dependency in my projects pom.xml as well as baked it into the Dockerfile like 
this:

FROM flink:1.8.0-scala_2.11

ADD --chown=flink:flink 
http://central.maven.org/maven2/org/apache/flink/flink-statebackend-rocksdb_2.11/1.8.0/flink-statebackend-rocksdb_2.11-1.8.0.jar
 /opt/flink/lib/flink-statebackend-rocksdb_2.11-1.8.0.jar


Everything seems to be normal up to the point when I try to stop and cleanly 
shutdown my pipeline. I get the following error:

java.lang.NoSuchMethodError: 
org.rocksdb.ColumnFamilyHandle.getDescriptor()Lorg/rocksdb/ColumnFamilyDescriptor;
at 
org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater(RocksDBOperationUtils.java:160)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:331)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:362)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.dispose(DoFnOperator.java:470)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.tryDisposeAllOperators(StreamTask.java:454)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:337)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

I can cancel my pipeline and snapshotting in general works, however. Flink 
1.7.2 with Beam 2.12.0 did not have any problem, could it be that this is 
caused by the switch to FRocksDb?[0]

Best,
Tobias

[0] 
https://ci.apache.org/projects/flink/flink-docs-stable/release-notes/flink-1.8.html#rocksdb-version-bump-and-switch-to-frocksdb-flink-10471


Re: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 Thread Yun Tang
Hi 中锋

恐怕不能通过代码来回复savepoint,目前一共只有有两个地方可以传入savepoint path,分别是


  1.  CliFrontendParser#createSavepointRestoreSettings  [1]
  2.  JarRunHandler#getSavepointRestoreSettings [2]

分别对应命令行,网页(REST)提交,没办法在代码里面进行恢复请求,其实我理解REST或者网页提交应该也满足你们的需求。

[1] 
https://github.com/apache/flink/blob/f400fbbe138696e82897941ee012f64c23f7dfcd/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java#L434
[2] 
https://github.com/apache/flink/blob/f400fbbe138696e82897941ee012f64c23f7dfcd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L111


祝好
唐云

From: 戴嘉诚 
Sent: Friday, August 9, 2019 20:54
To: user-zh@flink.apache.org 
Subject: 答复: 恢复savepoint,除了命令行,能通过代码获取吗?

你好,
可以通过flink的restFul去调用保存savepoint

发件人: liu zhongfeng
发送时间: 2019年8月9日 20:28
收件人: user-zh@flink.apache.org
主题: 恢复savepoint,除了命令行,能通过代码获取吗?

如题,restore savepoint,除了run flink -s 
savepointpath之外,能通过代码恢复吗,因为公司集群没法输入命令行。如果可以的话,能给个小demo,或者API也可以
谢谢。

Best,
Rio Liu, 刘中锋




Re: Capping RocksDb memory usage

2019-08-08 Thread Yun Tang
Hi Cam

I think FLINK-7289 [1] might offer you some insights to control RocksDB memory, 
especially the idea using write buffer manager [2] to control the total write 
buffer memory. If you do not have too many sst files, write buffer memory usage 
would consume much more space than index and filter usage. Since Flink would 
use per state per column family, and the write buffer number increase when more 
column families created.


[1] https://issues.apache.org/jira/browse/FLINK-7289
[2] https://github.com/dataArtisans/frocksdb/pull/4

Best
Yun Tang



From: Cam Mach 
Sent: Thursday, August 8, 2019 21:39
To: Biao Liu 
Cc: miki haiat ; user 
Subject: Re: Capping RocksDb memory usage

Thanks for your response, Biao.



On Wed, Aug 7, 2019 at 11:41 PM Biao Liu 
mailto:mmyy1...@gmail.com>> wrote:
Hi Cam,

AFAIK, that's not an easy thing. Actually it's more like a Rocksdb issue. There 
is a document explaining the memory usage of Rocksdb [1]. It might be helpful.

You could define your own option to tune Rocksdb through 
"state.backend.rocksdb.options-factory" [2]. However I would suggest not to do 
this unless you are fully experienced of Rocksdb. IMO it's quite complicated.

Meanwhile I can share a bit experience of this. We have tried to put the cache 
and filter into block cache before. It's useful to control the memory usage. 
But the performance might be affected at the same time. Anyway you could try 
and tune it. Good luck!

1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
2. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 11:44 AM Cam Mach 
mailto:cammac...@gmail.com>> wrote:
Yes, that is correct.
Cam Mach
Software Engineer
E-mail: cammac...@gmail.com<mailto:cammac...@gmail.com>
Tel: 206 972 2768



On Wed, Aug 7, 2019 at 8:33 PM Biao Liu 
mailto:mmyy1...@gmail.com>> wrote:
Hi Cam,

Do you mean you want to limit the memory usage of RocksDB state backend?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 2:23 AM miki haiat 
mailto:miko5...@gmail.com>> wrote:
I think using metrics exporter is the easiest way

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb


On Wed, Aug 7, 2019, 20:28 Cam Mach 
mailto:cammac...@gmail.com>> wrote:
Hello everyone,

What is the most easy and efficiently way to cap RocksDb's memory usage?

Thanks,
Cam



Re: Operator state

2019-08-08 Thread Yun Tang
Hi

When talking about sharing state, broadcast state [1][2] might be a choice.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html#provided-apis
[2] https://flink.apache.org/2019/06/26/broadcast-state.html

Best
Yun Tang



From: Navneeth Krishnan 
Sent: Thursday, August 8, 2019 13:50
To: user 
Subject: Operator state

Hi All,

Is there a way to share operator state among operators? For example, I have an 
operator which has union state and the same state data is required in a 
downstream operator. If not, is there a recommended way to share the state?

Thanks


Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Yun Tang
Congratulations Hequn.

Best
Yun Tang

From: Rong Rong 
Sent: Thursday, August 8, 2019 0:41
Cc: dev ; user 
Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer

Congratulations Hequn, well deserved!

--
Rong

On Wed, Aug 7, 2019 at 8:30 AM mailto:xingc...@gmail.com>> 
wrote:

Congratulations, Hequn!



From: Xintong Song mailto:tonysong...@gmail.com>>
Sent: Wednesday, August 07, 2019 10:41 AM
To: d...@flink.apache.org<mailto:d...@flink.apache.org>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer



Congratulations~!


Thank you~

Xintong Song





On Wed, Aug 7, 2019 at 4:00 PM vino yang 
mailto:yanghua1...@gmail.com>> wrote:

Congratulations!

highfei2...@126.com<mailto:highfei2...@126.com> 
mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:

> Congrats Hequn!
>
> Best,
> Jeff Yang
>
>
>  Original Message 
> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
> From: Piotr Nowojski
> To: JingsongLee
> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
> ,user
>
>
> Congratulations :)
>
> On 7 Aug 2019, at 12:09, JingsongLee 
> mailto:lzljs3620...@aliyun.com>> wrote:
>
> Congrats Hequn!
>
> Best,
> Jingsong Lee
>
> --
> From:Biao Liu mailto:mmyy1...@gmail.com>>
> Send Time:2019年8月7日(星期三) 12:05
> To:Zhu Zhu mailto:reed...@gmail.com>>
> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff Zhang 
> mailto:zjf...@gmail.com>>; Paul
> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
> mailto:sunjincheng...@gmail.com>>; dev
> mailto:d...@flink.apache.org>>; user 
> mailto:user@flink.apache.org>>
> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
>
> Congrats Hequn!
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
> mailto:reed...@gmail.com>> wrote:
> Congratulations to Hequn!
>
> Thanks,
> Zhu Zhu
>
> Zili Chen mailto:wander4...@gmail.com>> 于2019年8月7日周三 
> 下午5:16写道:
> Congrats Hequn!
>
> Best,
> tison.
>
>
> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 下午5:14写道:
> Congrats Hequn!
>
> Paul Lam mailto:paullin3...@gmail.com>> 于2019年8月7日周三 
> 下午5:08写道:
> Congrats Hequn! Well deserved!
>
> Best,
> Paul Lam
>
> 在 2019年8月7日,16:28,jincheng sun 
> mailto:sunjincheng...@gmail.com>> 写道:
>
> Hi everyone,
>
> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Hequn has been contributing to Flink for many years, mainly working on
> SQL/Table API features. He's also frequently helping out on the user
> mailing lists and helping check/vote the release.
>
> Congratulations Hequn!
>
> Best, Jincheng
> (on behalf of the Flink PMC)
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>


Re: Re:Re: Re: Flink RocksDBStateBackend 问题

2019-08-06 Thread Yun Tang

  1.  
你对增量checkpoint的理解以及taskmanager和RocksDB之间的关系理解不太对。RocksDBKeyedStateBackend使用RocksDB存储state数据,可以理解成其是taskmanager的一部分,实际上是单机的概念。增量checkpoint的时候,RocksDB会在同步阶段将其所有数据刷写到磁盘上,Flink框架选择之前没有上传的sst文件,异步上传到HDFS。如果没有开启local
 recovery,那么新启动的taskmanager会从hdfs上下载全量的数据文件进行恢复。
  2.  作业cancel了,task都会cancel,RocksDBKeyedStateBackend自然也会退出。

祝好
唐云

From: lvwenyuan 
Sent: Tuesday, August 6, 2019 14:53
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: Flink RocksDBStateBackend 问题

唐老师您好:
 我这里指的是checkpoint时存储数据的file system,这里我用的是HDFS。
按照老师的说法,我可不可以这样理解(在Flink on yarn 以及 使用 RocksDBStateBackend 的场景下):
   1.做增量checkpoint的时候,taskmanager默认异步的将数据写入rocksdb和hdfs中(数据是相同的)。
   
当taskmanager异常退出时,会启动另一个taskmanager去做task,那么新起的taskmanager是否会去hdfs上同步数据。
   那么这个rocksdb的意义是在哪里?数据反正HDFS上都有,而且HDFS本身也多副本。
   2.在做savepoint的时候,一般会将数据保存在HDFS上,那么这个是,我的命令是  flink cancel -s  
。做完savepoint就退出,那么这个时候rocksdb还需要去写数据吗?因为做完savepoint,整个任务就结束了。


  望解答,谢谢老师!







在 2019-08-06 13:44:18,"Yun Tang"  写道:
>@lvwenyuan<mailto:lvwenyuan...@163.com>
>首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file 
>system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。
>
>  *   如果指的是存储checkpoint数据的远程file system,在incremental 
> checkpoint场景下,这些数据与RocksDB的创建checkpoint时刷写到本地的sst文件和meta文件是二进制相同的,只是文件名会重命名。如果是savepoint或者全量checkpoint场景下,这些数据是RocksDB中逐个有效entry的序列化内容。
>  *   
> 如果指的是FsStateBackend,对于Flink而言存储的数据内容在逻辑上肯定都是一样的,否则就不符合语义了。但是二者在数据存储格式上是有区别的。FsStateBackend所创建的HeapKeyedStateBackend的数据内容都是存储在Java
>  heap内的,基本数据格式是StateTable[1]和其中存储数据的StateMap[2]。而RocksDB存储的数据主要是RocksDB 
> native内存中的writer buffer(memtable),block 
> cache,index[3]和已经刷写到磁盘上默认采用snappy压缩的不可变sst文件构成。
>
>[1] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>[2] 
>https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMap.java
>[3] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>
>
>祝好
>唐云
>
>
>From: 戴嘉诚 
>Sent: Tuesday, August 6, 2019 12:01
>To: user-zh@flink.apache.org 
>Subject: 答复: Re: Flink RocksDBStateBackend 问题
>
>
> 不是,文档上有说,filesystem是会把正在运行的数据存储在tm的内存中,然后触发checkpoint后,才会写入文件系统上,而rocksdb是直接把运行中的数据写到了rocksdb上,看样子是不占用运行中的tm的内存。
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#the-fsstatebackend
>
>`The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon 
>checkpointing, it writes state snapshots into files in the configured file 
>system and directory. Minimal metadata is stored in the JobManager’s memory 
>(or, in high-availability mode, in the metadata checkpoint).`
>
>发件人: athlon...@gmail.com
>发送时间: 2019年8月6日 11:53
>收件人: user-zh
>主题: Re: Re: Flink RocksDBStateBackend 问题
>
>你说的是memsystem的状态数据存在jm内存中的filesystem是存到文件系统上的
>
>
>
>athlon...@gmail.com
>发件人: 戴嘉诚
>发送时间: 2019-08-06 11:42
>收件人: user-zh
>主题: Re: Flink RocksDBStateBackend 问题
>FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
>FileSystem的吞吐就会比rocksdb会高
>lvwenyuan  于2019年8月6日周二 上午11:39写道:
>> 请教各位:
>>RocksDBStateBackend
>> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>>
>>
>>
>>
>


Re: Re: Flink RocksDBStateBackend 问题

2019-08-05 Thread Yun Tang
@lvwenyuan
首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file 
system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。

  *   如果指的是存储checkpoint数据的远程file system,在incremental 
checkpoint场景下,这些数据与RocksDB的创建checkpoint时刷写到本地的sst文件和meta文件是二进制相同的,只是文件名会重命名。如果是savepoint或者全量checkpoint场景下,这些数据是RocksDB中逐个有效entry的序列化内容。
  *   
如果指的是FsStateBackend,对于Flink而言存储的数据内容在逻辑上肯定都是一样的,否则就不符合语义了。但是二者在数据存储格式上是有区别的。FsStateBackend所创建的HeapKeyedStateBackend的数据内容都是存储在Java
 heap内的,基本数据格式是StateTable[1]和其中存储数据的StateMap[2]。而RocksDB存储的数据主要是RocksDB 
native内存中的writer buffer(memtable),block 
cache,index[3]和已经刷写到磁盘上默认采用snappy压缩的不可变sst文件构成。

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateMap.java
[3] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB


祝好
唐云


From: 戴嘉诚 
Sent: Tuesday, August 6, 2019 12:01
To: user-zh@flink.apache.org 
Subject: 答复: Re: Flink RocksDBStateBackend 问题


不是,文档上有说,filesystem是会把正在运行的数据存储在tm的内存中,然后触发checkpoint后,才会写入文件系统上,而rocksdb是直接把运行中的数据写到了rocksdb上,看样子是不占用运行中的tm的内存。

 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#the-fsstatebackend

`The FsStateBackend holds in-flight data in the TaskManager’s memory. Upon 
checkpointing, it writes state snapshots into files in the configured file 
system and directory. Minimal metadata is stored in the JobManager’s memory 
(or, in high-availability mode, in the metadata checkpoint).`

发件人: athlon...@gmail.com
发送时间: 2019年8月6日 11:53
收件人: user-zh
主题: Re: Re: Flink RocksDBStateBackend 问题

你说的是memsystem的状态数据存在jm内存中的filesystem是存到文件系统上的



athlon...@gmail.com
发件人: 戴嘉诚
发送时间: 2019-08-06 11:42
收件人: user-zh
主题: Re: Flink RocksDBStateBackend 问题
FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
FileSystem的吞吐就会比rocksdb会高
lvwenyuan  于2019年8月6日周二 上午11:39写道:
> 请教各位:
>RocksDBStateBackend
> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>
>
>
>



Re: Memory constrains running Flink on Kubernetes

2019-08-05 Thread Yun Tang
You are correct, the default value of write buffer size is 64 MB [1]. However, 
the java doc for this value is not correct [2]. Already created a PR to fix 
this.

[1] 
https://github.com/facebook/rocksdb/blob/30edf1874c11762a6cacf4434112ce34d13100d3/include/rocksdb/options.h#L191
[2] 
https://github.com/facebook/rocksdb/blob/30edf1874c11762a6cacf4434112ce34d13100d3/java/src/main/java/org/rocksdb/MutableColumnFamilyOptionsInterface.java#L24

Best
Yun Tang

From: wvl 
Sent: Monday, August 5, 2019 17:55
To: Yu Li 
Cc: Yun Tang ; Yang Wang ; Xintong 
Song ; user 
Subject: Re: Memory constrains running Flink on Kubernetes

Btw, with regard to:

> The default writer-buffer-number is 2 at most for each column family, and the 
> default write-buffer-memory size is 4MB.

This isn't what I see when looking at the OPTIONS-XX file in the rocksdb 
directories in state:

[CFOptions "xx"]
  ttl=0
  report_bg_io_stats=false
  
compaction_options_universal={allow_trivial_move=false;size_ratio=1;min_merge_width=2;max_size_amplification_percent=200;max_merge_width=4294967295;compression_size_percent=-1;stop_style=kCompactionStopStyleTotalSize;}
  table_factory=BlockBasedTable
  paranoid_file_checks=false
  compression_per_level=
  inplace_update_support=false
  soft_pending_compaction_bytes_limit=68719476736
  max_successive_merges=0
  max_write_buffer_number=2
  level_compaction_dynamic_level_bytes=false
  max_bytes_for_level_base=268435456
  optimize_filters_for_hits=false
  force_consistency_checks=false
  disable_auto_compactions=false
  max_compaction_bytes=1677721600
  hard_pending_compaction_bytes_limit=274877906944
  
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;ttl=0;}
  max_bytes_for_level_multiplier=10.00
  level0_file_num_compaction_trigger=4
  level0_slowdown_writes_trigger=20
  compaction_pri=kByCompensatedSize
  compaction_filter=nullptr
  level0_stop_writes_trigger=36
  write_buffer_size=67108864
  min_write_buffer_number_to_merge=1
  num_levels=7
  target_file_size_multiplier=1
  arena_block_size=8388608
  memtable_huge_page_size=0
  bloom_locality=0
  inplace_update_num_locks=1
  memtable_prefix_bloom_size_ratio=0.00
  max_sequential_skip_in_iterations=8
  max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
  compression=kSnappyCompression
  max_write_buffer_number_to_maintain=0
  bottommost_compression=kDisableCompressionOption
  comparator=leveldb.BytewiseComparator
  prefix_extractor=nullptr
  target_file_size_base=67108864
  merge_operator=StringAppendTESTOperator
  memtable_insert_with_hint_prefix_extractor=nullptr
  memtable_factory=SkipListFactory
  compaction_filter_factory=nullptr
  compaction_style=kCompactionStyleLevel

Are these options somehow not applied or overridden?

On Mon, Jul 29, 2019 at 4:42 PM wvl mailto:lee...@gmail.com>> 
wrote:
Excellent. Thanks for all the answers so far.

So there was another issue I mentioned which we made some progress gaining 
insight into, namely our metaspace growth when faced with job restarts.

We can easily hit 1Gb metaspace usage within 15 minutes if we restart often.
We attempted to troubleshoot this issue by looking at all the classes in 
metaspace using `jcmd  GC.class_stats`.

Here we observed that after every job restart another entry is created for 
every class in our job. Where the old classes have InstBytes=0. So far so good, 
but moving to the Total column for these entries show that memory is still 
being used.
Also, adding up all entries in the Total column indeed corresponds to our 
metaspace usage. So far we could only conclude that our job classes - none of 
them - were being unloaded.

Then we stumbled upon this ticket. Now here are our results running the 
SocketWindowWordCount jar in a flink 1.8.0 cluster with one taskmanager.

We achieve a class count by doing a jcmd 3052 GC.class_stats | grep -i 
org.apache.flink.streaming.examples.windowing.SessionWindowing | wc -l

First run:
  Class Count: 1
  Metaspace: 30695K

After 800~ runs:
  Class Count: 802
  Metaspace: 39406K

Interesting when we looked a bit later the class count slowly went down, 
slowly, step by step, where just to be sure we used `jcmd  GC.run` to 
force GC every 30s or so. If I had to guess it took about 20 minutes to go from 
800~ to 170~, with metaspace dropping to 35358K. In a sense we've seen this 
behavior, but with much much larger increases in metaspace usage over far fewer 
job restarts.

I've added this information to 
https://issues.apache.org/jira/browse/FLINK-11205.

That said, I'd really like to confirm the following:
- classes should usually only appear once in GC.class_stats output
- flink / the jvm has very slow cleanup of the metaspace
- something clearly is leaking during restarts

On Mon, Jul 29, 2019 at 9:52 AM Yu Li 
mailto:car...@gmail.com>> wrote:
For the memory usage of RocksDB, there's already some discussion in 
FLINK-7289<

Re: Savepoint process recovery in Jobmanager HA setup

2019-07-27 Thread Yun Tang
Hi Abhinav

If the leader jobmanager fails during savepoint, that savepoint would fail and 
new jobmanager would then restore from previous jobgraph with latest completed 
checkpoint in the high-availability storage. That's why new jobmanager could 
not know anything with regard to previous savepoint.


Best
Yun Tang

From: Bajaj, Abhinav 
Sent: Saturday, July 27, 2019 7:25
To: user@flink.apache.org 
Subject: Savepoint process recovery in Jobmanager HA setup


Hi,



I am trying to test a scenario that triggers a savepoint on a Flink 1.7.1 Job 
deployed with jobmanager HA mode.

The purpose is to check if savepoint process recovers if the leader jobmanager 
fails during the savepoint.



During my testing, I found that the new leader jobmanager returns the below 
error for the savepoint trigger request –

{"errors":["Operation not found under key: 
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@e287af3"]}



Does Flink support savepoint process recovery in Jobmanager HA setup?

If yes, can you please suggest how to find the savepoint request?



Appreciate your time and help.



~ Abhinav Bajaj


Re: Re: Flink checkpoint 并发问题

2019-07-25 Thread Yun Tang
++stateSize;
  Entry next = iterator.next();
  if (lasttime >= next.getValue()) {
iterator.remove();
--stateSize;
    ++removeState;
  }
}
if (stateSize == 0) {
  accumulateStateMap.clear();
}
//把这个定时器删除掉
ctx.timerService().deleteEventTimeTimer(timestamp);
  }
})

Yun Tang  于2019年7月25日周四 下午8:39写道:

> Hi 戴嘉诚
>
> 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
> state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor Long>,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:
>
>   *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
>   *   由于operator
> state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
> state的申明以及相关的使用地方也最好提供一下。
>
> [1]
> https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Thursday, July 25, 2019 19:26
> To: user-zh 
> Subject: Re: Re: Flink checkpoint 并发问题
>
> hi
> 你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
> descriptor是使用MapStateDescriptor,
> 谢谢!
>
> Yun Tang  于2019年7月25日周四 下午7:10写道:
>
> > Hi  all
> >
> > 你们讨论的已经越来越偏了,出问题的是operator state
> > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
> >
> > To 戴嘉诚
> > 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
> >
> > 祝好
> > 唐云
> > 
> > From: 戴嘉诚 
> > Sent: Thursday, July 25, 2019 19:04
> > To: user-zh@flink.apache.org 
> > Subject: Re: Re: Flink checkpoint 并发问题
> >
> > 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> > 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
> >
> > athlon...@gmail.com 于2019年7月25日 周四18:50写道:
> >
> > > 那你用window和evictor 不可以吗?
> > > 类似这样,因为我理解你的业务需求可以用这个来实现
> > >
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> > >
> > > --
> > > athlon...@gmail.com
> > >
> > >
> > > *发件人:* 戴嘉诚 
> > > *发送时间:* 2019-07-25 18:45
> > > *收件人:* user-zh 
> > > *主题:* Re: Re: Flink checkpoint 并发问题
> > >
> > >
> > >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > > 对
> > >
> > > athlon...@gmail.com  于2019年7月25日周四 下午6:40写道:
> > >
> > > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > > >
> > > >
> > > >
> > > > athlon...@gmail.com
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:24
> > > > 收件人: user-zh
> > > > 主题: Re: Flink checkpoint 并发问题
> > > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > > >
> > > > athlon...@gmail.com  于2019年7月25日周四 下午6:20写道:
> > > >
> > > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > > >
> > > > >
> > > > >
> > > > > athlon...@gmail.com
> > > > >
> > > > > 发件人: 戴嘉诚
> > > > > 发送时间: 2019-07-25 18:07
> > > > > 收件人: user-zh
> > > > > 主题: Flink checkpoint 并发问题
> > > > > 大家好:
> > > > >
> > > > > 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > > >
> > > > >
> > > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > > (16/20).
> > > > >
> > > > >  at
> > > > >
> > > >
> > >
> >
> org.apache.flin

Re: Memory constrains running Flink on Kubernetes

2019-07-25 Thread Yun Tang
Hi

It's definitely not easy to calculate the accurate memory usage of RocksDB, but 
formula of "block-cache-memory + column-family-number * write-buffer-memory * 
write-buffer-number + index memory"  should give enough sophisticated 
hints.
When talking about the column-family-number, they are equals to the number of 
your states which are the declared state descriptors in one operator and 
potential one window state (if you're using window).
The default writer-buffer-number is 2 at most for each column family, and the 
default write-buffer-memory size is 4MB. Pay attention that if you ever 
configure the options for RocksDB, these memory usage would differ from default 
values.
The last part of index memory is not easy to estimate, but from my 
experience this part of memory would not occupy too much only if you have many 
open files.

Last but not least, Flink would enable slot sharing by default, and even if you 
only one slot per taskmanager, there might exists many RocksDB within that TM 
due to many operator with keyed state running.

Apart from the theoretical analysis, you'd better to open RocksDB native 
metrics or track the memory usage of pods through Prometheus with k8s.

Best
Yun Tang

From: wvl 
Sent: Thursday, July 25, 2019 17:50
To: Yang Wang 
Cc: Yun Tang ; Xintong Song ; user 

Subject: Re: Memory constrains running Flink on Kubernetes

Thanks for all the answers so far.

Especially clarifying was that RocksDB memory usage isn't accounted for in the 
flink memory metrics. It's clear that we need to experiment to understand it's 
memory usage and knowing that we should be looking at the container memory 
usage minus all the jvm managed memory, helps.

In mean while, we've set MaxMetaspaceSize to 200M based on our metrics. Sadly 
the resulting OOM does not result a better behaved job, because it would seem 
that the (taskmanager) JVM itself is not restarted - which makes sense in a 
multijob environment.
So we're looking into ways to simply prevent this metaspace growth (job library 
jars in /lib on TM).

Going back to RocksDB, the given formula "block-cache-memory + 
column-family-number * write-buffer-memory * write-buffer-number + index 
memory." isn't completely clear to me.

Block Cache: "Out of box, RocksDB will use LRU-based block cache implementation 
with 8MB capacity"
Index & Filter Cache: "By default index and filter blocks are cached outside of 
block cache, and users won't be able to control how much memory should be use 
to cache these blocks, other than setting max_open_files.". The default 
settings doesn't set max_open_files and the rocksdb default seems to be 1000 
(https://github.com/facebook/rocksdb/blob/master/include/rocksdb/utilities/leveldb_options.h#L89)
 .. not completely sure about this.
Write Buffer Memory: "The default is 64 MB. You need to budget for 2 x your 
worst case memory use."

May I presume a unique ValueStateDescriptor equals a Column Family?
If so, say I have 10 of those.
8MB + (10 * 64 * 2) + $Index

So is that correct and how would one calculate $Index? The docs 
suggest a relationship between max_open_files (1000) and the amount 
index/filter of blocks that can be cached, but is this a 1 to 1 relationship? 
Anyway, this concept of blocks is very unclear.

> Have you ever set the memory limit of your taskmanager pod when launching it 
> in k8s?

Definitely. We settled on 8GB pods with taskmanager.heap.size: 5000m and 1 slot 
and were looking into downsizing a bit to improve our pod to VM ratio.

On Wed, Jul 24, 2019 at 11:07 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:

Hi,


The heap in a flink TaskManager k8s pod include the following parts:

  *   jvm heap, limited by -Xmx
  *   jvm non-heap, limited by -XX:MaxMetaspaceSize
  *   jvm direct memory, limited by -XX:MaxDirectMemorySize
  *   native memory, used by rocksdb, just as Yun Tang said, could be limited 
by rocksdb configurations


So if your k8s pod is terminated by OOMKilled, the cause may be the non-heap 
memory or native memory. I suggest you add an environment 
FLINK_ENV_JAVA_OPTS_TM="-XX:MaxMetaspaceSize=512m" in your taskmanager.yaml. 
And then only the native memory could cause OOM. Leave enough memory for 
rocksdb, and then hope your job could run smoothly.

Yun Tang mailto:myas...@live.com>> 于2019年7月24日周三 下午3:01写道:
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in 
k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You 
could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula 
could be: block-cache-memory + column-family-number * write-buffer-memory * 
write-buffer-number + index memory. The block cache, write buffer 
memory could be mainly configured. And the column-family number is 
decided by the state number within

Re: Re: Flink checkpoint 并发问题

2019-07-25 Thread Yun Tang
Hi 戴嘉诚

从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list 
state进行deepcopy,但是你告诉我的state descriptor却是MapStateDescriptor,这二者之间明显是对不上的,还请再校验一下你的代码。异常栈出现在checkpoint同步阶段的内容拷贝时的ConcurrentModification,这个确实是很奇怪的,所以还需要麻烦回答一下下面几个问题:

  *   这个问题是必现的么?作业failover或者重新提交之后也会出现么?
  *   由于operator 
state的snapshot/restore需要由用户提供,所以如果你继承了ListCheckpointed接口,请提供一下snapshotState和restoreState接口的实现(如果是继承了CheckpointedFunction接口,请提供一下snapshotState和initializeState的实现)。另外,这个operator
 state的申明以及相关的使用地方也最好提供一下。

[1] 
https://github.com/apache/flink/blob/480875f045a9777877ed1a90f9e0c6e01b7e03c9/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L88

祝好
唐云

From: 戴嘉诚 
Sent: Thursday, July 25, 2019 19:26
To: user-zh 
Subject: Re: Re: Flink checkpoint 并发问题

hi
你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
descriptor是使用MapStateDescriptor,
谢谢!

Yun Tang  于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Thursday, July 25, 2019 19:04
> To: user-zh@flink.apache.org 
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> athlon...@gmail.com 于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > --
> > athlon...@gmail.com
> >
> >
> > *发件人:* 戴嘉诚 
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh 
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > athlon...@gmail.com  于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > athlon...@gmail.com
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > athlon...@gmail.com  于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > athlon...@gmail.com
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > > 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >  at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> >

Re: Memory constrains running Flink on Kubernetes

2019-07-24 Thread Yun Tang
Hi William

Have you ever set the memory limit of your taskmanager pod when launching it in 
k8s? If not, I'm afraid your node might come across node out-of-memory [1]. You 
could increase the limit by analyzing your memory usage
When talking about the memory usage of RocksDB, a rough calculation formula 
could be: block-cache-memory + column-family-number * write-buffer-memory * 
write-buffer-number + index memory. The block cache, write buffer 
memory could be mainly configured. And the column-family number is 
decided by the state number within your operator. The last part of index 
memory cannot be measured well only if you also cache them in block cache [2] 
(but this would impact the performance).
If you want to the memory stats of rocksDB, turn on the native metrics of 
RocksDB [3] is a good choice.


[1] 
https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/#node-oom-behavior
[2] 
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#rocksdb-native-metrics

Best
Yun Tang

From: Xintong Song 
Sent: Wednesday, July 24, 2019 11:59
To: wvl 
Cc: user 
Subject: Re: Memory constrains running Flink on Kubernetes

Hi,

Flink acquires these 'Status_JVM_Memory' metrics through the MXBean library. 
According to MXBean document, non-heap is "the Java virtual machine manages 
memory other than the heap (referred as non-heap memory)". Not sure whether 
that is equivalent to the metaspace. If the '-XX:MaxMetaspaceSize', it should 
trigger metaspcae clean up when the limit is reached.

As for RocksDB, it mainly uses non-java memory. Heap, non-heap and direct 
memory could be considered as java memory (or at least allocated through the 
java process). That means, RocksDB is actually using the memory that is 
accounted in the total K8s container memory but not accounted in neither of 
java heap / non-heap / direct memory, which in your case the 1GB unaccounted. 
To leave more memory for RocksDB, you need to either configure more memory for 
the K8s containers, or configure less java memory through the config option 
'taskmanager.heap.size'.

The config option 'taskmanager.heap.size', despite the 'heap' in its key, also 
accounts for network memory (which uses direct buffers). Currently, memory 
configurations in Flink is quite complicated and confusing. The community is 
aware of this, and is planing for an overall improvement.

To my understanding, once you set '-XX:MaxMetaspaceSize', there should be 
limits on heap, non-heap and direct memory in JVM. You should be able to find 
which part that requires memory more than the limit from the java OOM error 
message. If there is no java OOM but a K8s container OOM, then it should be 
non-java memory used by RocksDB.

[1] 
https://docs.oracle.com/javase/8/docs/api/java/lang/management/MemoryMXBean.html


Thank you~

Xintong Song


On Tue, Jul 23, 2019 at 8:42 PM wvl mailto:lee...@gmail.com>> 
wrote:
Hi,

We're running a relatively simply Flink application that uses a bunch of state 
in RocksDB on Kubernetes.
During the course of development and going to production, we found that we were 
often running into memory issues made apparent by Kubernetes OOMKilled and Java 
OOM log events.

In order to tackle these, we're trying to account for all the memory used in 
the container, to allow proper tuning.
Metric-wise we have:
- container_memory_working_set_bytes = 6,5GB
- flink_taskmanager_Status_JVM_Memory_Heap_Max =  4,7GB
- flink_taskmanager_Status_JVM_Memory_NonHeap_Used = 325MB
- flink_taskmanager_Status_JVM_Memory_Direct_MemoryUsed = 500MB

This is my understanding based on all the documentation and observations:
container_memory_working_set_bytes will be the total amount of memory in use, 
disregarding OS page & block cache.
Heap will be heap.
NonHeap is mostly the metaspace.
Direct_Memory is mostly network buffers.

Running the numbers I have 1 GB unaccounted for. I'm also uncertain as to 
RocksDB. According to the docs RocksDB has a "Column Family Write Buffer" where 
"You need to budget for 2 x your worst case memory use".
We have 17 ValueStateDescriptors (ignoring state for windows) which I'm 
assuming corresponds to a "Column Family" in RockDB. Meaning our budget should 
be around 2GB.
Is this accounted for in one of the flink_taskmanager metrics above? We've also 
enabled various rocksdb metrics, but it's unclear where this Write Buffer 
memory would be represented.

Finally, we've seen that when our job has issues and is restarted rapidly, 
NonHeap_Used grows from an initial 50Mb to 700MB, before our containers are 
killed. We're assuming this is due
to no form of cleanup in the metaspace as classes get (re)loaded.

These are our taskmanager JVM settings: -XX:+UseG1GC -XX:MaxDirectMemorySize=1G 
-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLi

Re: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted is for what?

2019-07-23 Thread Yun Tang
Hi Andrew

FilesCreated = CreateFileOps + FsDirMkdirOp Please refer to [1] and [2] to know 
the meaning of this metrics.


[1] 
https://github.com/apache/hadoop/blob/377f95bbe8d2d171b5d7b0bfa7559e67ca4aae46/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java#L178
[2] 
https://github.com/apache/hadoop/blob/377f95bbe8d2d171b5d7b0bfa7559e67ca4aae46/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java#L799

Best
Yun Tang



From: 陈Darling 
Sent: Tuesday, July 23, 2019 11:32
To: qcx978132...@gmail.com 
Cc: user@flink.apache.org ; myas...@live.com 

Subject: Fwd: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted 
is for what?

Hi Yun Tang

Your suggestion is very very important to us.
 According to your suggestion, We have suggested that users increase the 
interval time (1 to 5 minutes) and set state.backend.fs.memory-threshold=10k.

But we only have one hdfs cluster, we try to reduce Hdfs api call, I don't know 
if there is any possibility of re-optimization,

Thank you very much for your patience and help.


Darling
Andrew D.Lin



下面是被转发的邮件:

发件人: Congxian Qiu mailto:qcx978132...@gmail.com>>
主题: 回复: FsStateBackend,hdfs rpc api too much,FileCreated and FileDeleted is for 
what?
日期: 2019年7月23日 GMT+8 上午9:48:05
收件人: 陈Darling mailto:chendonglin...@gmail.com>>
抄送: user mailto:user@flink.apache.org>>

Hi Andrew

These API calls are for checkpoint file created/deleted, and there is an 
ongoing issue[1] which want to reduce the number.
[1] https://issues.apache.org/jira/browse/FLINK-11696

Best,
Congxian


陈Darling mailto:chendonglin...@gmail.com>> 
于2019年7月22日周一 下午11:22写道:

Hi

We use  ‘FsStateBackend' as  our state beckend !


The following figure shows the frequency of the hdfs API call.

I don’t understand FilesCreated and FileDeleted is for what?   All of these are 
necessary?

 Is it possible to reduce some unnecessary?





[cid:9b42fd64-e726-47f2-a745-092f6a24d62e@namprd14.prod.outlook.com]






Darling
Andrew D.Lin



Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
Hi

[https://issues.apache.org/jira/browse/FLINK-11696] 
里面目前的PR是我们的生产代码,你可以用。但是你现在的问题的root 
cause不是这个,而是创建文件和删除文件的请求太多了。可以统计一下目前你们几百个作业的checkpoint 
interval,一般而言3~5min的间隔就完全足够了,没必要将interval调整得太小,这是一个影响你们整个集群使用的配置,必要时需要告知用户正确的配置。

如果你们使用FsStateBackend,在目前的Flink场景下,已经是创建文件数目最优的选项了。剩下能做的优化就是降低不必要的并发度还有就是调大 
state.backend.fs.memory-threshold 
参数(默认值是1KB,最大值是1MB),但是这个参数会有一个副作用,可能需要同时调大jobmanager的heap大小。

祝好
唐云

From: 陈冬林 <874269...@qq.com>
Sent: Friday, July 19, 2019 9:45
To: user-zh@flink.apache.org 
Subject: Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?


[cid:D24EB2D4-FA6B-41BD-A6B5-265B7E0C259E@360buyAD.local]

您好!
我觉得您分析的很有道理,集群 createFile和deleteFile 调用请求是最多的。

我们的小集群上跑了近百个flink任务,使用FsStateBackend 存储到hdfs上。checkpoint interval不是我们能决定的。
有没有其他方面可以减少namenode的压力,我看了您github上的代码,是个很不错的优化点,可以考虑实践一下,请问线上验证过吗,我稍后再学习一下您的代码?

至于使用FsStateBackend能否减少checkpoint文件数量,这是另外一个话题

请问这个话题有没有什么优化点可以启发一下吗?




在 2019年7月18日,下午9:34,Yun Tang mailto:myas...@live.com>> 写道:

hi

首先先要确定是否是大量创造文件导致你的namenode 
RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 
[https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint 
interval太小,导致文件不断被创建和删除(subsume old checkpoint),先找到NN压力大的root cause吧。

至于使用FsStateBackend能否减少checkpoint文件数量,这是另外一个话题。首先,我需要弄清楚你目前使用的是什么state 
backend,如果目前是MemoryStateBackend,由于该state backend对应的keyed state 
backend并不会在checkpoint时候创建任何文件,反而在文件数目上来看是对NN压力最小的(相比于FsStateBackend来说要更好)。还有你作业的并发度是多少,每个checkpoint目录下的文件数目又是多少。降低并发度是一种减少文件数目的办法。当然,我觉得如果你只是使用MemoryStateBackend就足够handle
 checkpoint size的话,不应该会触及文件数目太多的问题,除非你的checkpoint间隔实在太小了。

祝好
唐云

From: 陈冬林 <874269...@qq.com<mailto:874269...@qq.com>>
Sent: Thursday, July 18, 2019 17:49
To: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 
mailto:user-zh@flink.apache.org>>
Cc: myas...@live.com<mailto:myas...@live.com> 
mailto:myas...@live.com>>
Subject: Fwd: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?


唐云老师您好;
基于hdfs的backend 可以优化checkpoint小文件的数量吗?减少namenode压力吗?
现状是会影响namenode rpc响应设计  gc频繁,内存占用过高。

下面是被转发的邮件:

发件人: 陈冬林 <874269...@qq.com<mailto:874269...@qq.com>>
主题: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?
日期: 2019年7月18日 GMT+8 下午3:21:12
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>

[cid:A90251C2-5DED-42D9-AA11-8D9314A2F1B9@360buyAD.local]

state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/1e95606a-8f70-4876-ad6f-95e5cc38af86
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/2a012214-734a-4c2b-804b-d96f4f3dddf8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/31871f64-7034-4323-9a2e-5e387e61b7c4
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/54c12a36-c121-4fa0-be76-7996946b4beb
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/63a22932-4bce-4531-bc65-a74d403efb91
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/64b10d96-8333-4a7e-87d1-8afe24c7d2df
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/66290710-e619-4ccf-90b6-5f09f89354f8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/_metadata

QA1: chk文件下面的文件个数是跟operator个数并行度有关系吗?我只了解到_metadata文件是用来恢复状态的,那么其他文件代表的是什么意思呢?

QA2: 可以将这些文件合并在一起吗?



Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
hi

首先先要确定是否是大量创造文件导致你的namenode 
RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 
[https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint 
interval太小,导致文件不断被创建和删除(subsume old checkpoint),先找到NN压力大的root cause吧。

至于使用FsStateBackend能否减少checkpoint文件数量,这是另外一个话题。首先,我需要弄清楚你目前使用的是什么state 
backend,如果目前是MemoryStateBackend,由于该state backend对应的keyed state 
backend并不会在checkpoint时候创建任何文件,反而在文件数目上来看是对NN压力最小的(相比于FsStateBackend来说要更好)。还有你作业的并发度是多少,每个checkpoint目录下的文件数目又是多少。降低并发度是一种减少文件数目的办法。当然,我觉得如果你只是使用MemoryStateBackend就足够handle
 checkpoint size的话,不应该会触及文件数目太多的问题,除非你的checkpoint间隔实在太小了。

祝好
唐云

From: 陈冬林 <874269...@qq.com>
Sent: Thursday, July 18, 2019 17:49
To: user-zh@flink.apache.org 
Cc: myas...@live.com 
Subject: Fwd: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?


唐云老师您好;
基于hdfs的backend 可以优化checkpoint小文件的数量吗?减少namenode压力吗?
现状是会影响namenode rpc响应设计  gc频繁,内存占用过高。

下面是被转发的邮件:

发件人: 陈冬林 <874269...@qq.com>
主题: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?
日期: 2019年7月18日 GMT+8 下午3:21:12
收件人: user-zh@flink.apache.org

[cid:A90251C2-5DED-42D9-AA11-8D9314A2F1B9@360buyAD.local]

state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/1e95606a-8f70-4876-ad6f-95e5cc38af86
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/2a012214-734a-4c2b-804b-d96f4f3dddf8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/31871f64-7034-4323-9a2e-5e387e61b7c4
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/54c12a36-c121-4fa0-be76-7996946b4beb
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/63a22932-4bce-4531-bc65-a74d403efb91
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/64b10d96-8333-4a7e-87d1-8afe24c7d2df
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/66290710-e619-4ccf-90b6-5f09f89354f8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/_metadata

QA1: chk文件下面的文件个数是跟operator个数并行度有关系吗?我只了解到_metadata文件是用来恢复状态的,那么其他文件代表的是什么意思呢?

QA2: 可以将这些文件合并在一起吗?



Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
Hi

源码部分可以参考[1] DefaultOperatorStateBackendSnapshotStrategy 执行完成的时候,每个operator 
state backend 都只会产生至多一个文件。

state小文件合并,你指的应该是FLINK-11937<https://issues.apache.org/jira/browse/FLINK-11937> 
吧,这里的所谓合并是每个rocksDB state 
backend创建checkpoint的时候,在一定阈值内,若干sst文件的序列化结果都写到一个文件内。由于keyed 
state体积比较大,每次checkpoint时候,创建的文件数目一般不止一个。


[1] 
https://github.com/apache/flink/blob/1ec34249a0303ae64d049d177057ef9b6c413ab5/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java#L179

祝好
唐云



From: 陈冬林 <874269...@qq.com>
Sent: Thursday, July 18, 2019 15:34
To: user-zh@flink.apache.org 
Subject: Fwd: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

谢谢您的解答,
那些文件的数量是只和operator的并行度相关吗?是不是还有key 的个数等相关?有没有具体的公式呢?我没有在源码里找到这块的逻辑

还有一个最重要的问题,这些文件即然不能合并,state小文件合并指的是那些文件呢?


祝安
Andrew


> 下面是被转发的邮件:
>
> 发件人: Yun Tang 
> 主题: 回复: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?
> 日期: 2019年7月18日 GMT+8 下午3:24:57
> 收件人: "user-zh@flink.apache.org" 
> 回复-收件人: user-zh@flink.apache.org
>
> Hi
>
> A1: chk-x文件下面的文件个数是跟operator个数并行度是有关系的,主要是operator 
> state的文件。对于checkpoint场景,_metadata只是元数据,真实的operator数据都是在其他文件内。
>
> A2: 
> 不可以将这些文件合并在一起。因为_metadata内主要记录了文件路径,如果合并的话,找不到原始路径会有问题,无法从checkpoint进行restore
>
> 祝好
> 唐云
> From: 陈冬林 <874269...@qq.com>
> Sent: Thursday, July 18, 2019 15:21
> To: user-zh@flink.apache.org 
> Subject: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?
>
>
>
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/1e95606a-8f70-4876-ad6f-95e5cc38af86
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/2a012214-734a-4c2b-804b-d96f4f3dddf8
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/31871f64-7034-4323-9a2e-5e387e61b7c4
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/54c12a36-c121-4fa0-be76-7996946b4beb
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/63a22932-4bce-4531-bc65-a74d403efb91
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/64b10d96-8333-4a7e-87d1-8afe24c7d2df
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/66290710-e619-4ccf-90b6-5f09f89354f8
> state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/_metadata
>
> QA1: chk文件下面的文件个数是跟operator个数并行度有关系吗?我只了解到_metadata文件是用来恢复状态的,那么其他文件代表的是什么意思呢?
>
> QA2: 可以将这些文件合并在一起吗?



Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 Thread Yun Tang
Hi

A1: chk-x文件下面的文件个数是跟operator个数并行度是有关系的,主要是operator 
state的文件。对于checkpoint场景,_metadata只是元数据,真实的operator数据都是在其他文件内。

A2: 
不可以将这些文件合并在一起。因为_metadata内主要记录了文件路径,如果合并的话,找不到原始路径会有问题,无法从checkpoint进行restore

祝好
唐云

From: 陈冬林 <874269...@qq.com>
Sent: Thursday, July 18, 2019 15:21
To: user-zh@flink.apache.org 
Subject: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

[cid:A90251C2-5DED-42D9-AA11-8D9314A2F1B9@360buyAD.local]

state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/1e95606a-8f70-4876-ad6f-95e5cc38af86
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/2a012214-734a-4c2b-804b-d96f4f3dddf8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/31871f64-7034-4323-9a2e-5e387e61b7c4
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/54c12a36-c121-4fa0-be76-7996946b4beb
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/63a22932-4bce-4531-bc65-a74d403efb91
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/64b10d96-8333-4a7e-87d1-8afe24c7d2df
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/66290710-e619-4ccf-90b6-5f09f89354f8
state_checkpoints_dir/2d93ffacbddcf363b960317816566552/chk-2903/_metadata

QA1: chk文件下面的文件个数是跟operator个数并行度有关系吗?我只了解到_metadata文件是用来恢复状态的,那么其他文件代表的是什么意思呢?

QA2: 可以将这些文件合并在一起吗?


Re: Flink queryable state - io.netty4 errors

2019-07-07 Thread Yun Tang
Hi Shivam

Did this reproduce each time? Would you please share the full stack trace when 
you get this exception. Moreover, task manager log of that value state is also 
very welcome.

Best
Yun Tang

From: Shivam Dubey 
Sent: Sunday, July 7, 2019 17:35
To: user@flink.apache.org
Subject: Flink queryable state - io.netty4 errors


I am using Flink queryable state client to read a dummy valuestate I created.

The code is quite simple, just created a stream out of a kafka topic, keys it as

inputStream.keyBy(value -> 0L).map(new mapFucntion()).print()

Withing the mapFunction I create a value state, and declare it as queryable.

Then I try to read it from queryable state client.

While I do,

ValueState value = returnedCompletableFuture.get();

I am getting errors like-

Java.lang.IndexOutOfBoundException: readerIndex(0) + 
writerIndex(4) is greater than writeIndex(0)

Caused at some netty4.io.AbstractBufByte





I am really struggling to have this simple queryable client running and have 
found nothing over the internet to resolve it.


Re: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

2019-07-02 Thread Yun Tang
hi

首先,就算选择rocksDB statebackend,也是需要写HDFS的,只是在开启了incremental 
checkpoint方式情况下可以减少每次hdfs数据写入。

我觉得这个问题核心是一个trade 
off。不做checkpoint的时候,RocksDBStateBackend的读写性能不如纯内存的FsStateBackend。而在checkpoint的同步阶段,RocksDB
 
stateBackend需要全量写本地磁盘,比FsStateBackend的内存操作可能要慢一些,也会影响吞吐。在checkpoint的异步阶段,由于RocksDB
 stateBackend支持增量上传,所以对HDFS的压力可能要更小一些;但同时,也可以通过打开对FsStateBackend的压缩[1] 
来降低FsStateBackend对HDFS的压力。

如果你对吞吐很敏感的话,在state很小的时候,可以选择FsStateBackend,否则应该选择RocksDBStateBackend,可以避免OOM的风险。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#compression

祝好
唐云


From: yeyi9...@sina.com 
Sent: Wednesday, July 3, 2019 11:34
To: user-zh
Subject: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb呀,看官档说的是rockdb适合state很大的任务,可能吞吐会降低。但是如果选用file的话对hdfs的压力又很大


Re: Flink 1.9 进度跟踪方法

2019-07-01 Thread Yun Tang
hi Luan

你可以在 Flink的邮件列表里面去追踪 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html,据我所知,在JIRA上似乎没有一个总览的issue去追踪进度。

祝好
唐云

From: Luan Cooper 
Sent: Monday, July 1, 2019 16:17
To: user-zh@flink.apache.org
Subject: Flink 1.9 进度跟踪方法

Hi

我看了 Flink 1.9 特性介绍,想用到现有流计算项目中
但是我找了官方 Apache eJIRA Issue 没有找到 1.9 Feature List 对应的 JIRA Ticket

比如有一个 Flink 1.9 的 Issue,里面有所有 1.9 计划中的 Feature List
同时子 issue 中有 feature 的进度

请问有这样的 Release 跟踪 Issue 吗?

感谢


Re: Providing external files to flink classpath

2019-06-28 Thread Yun Tang
Hi Vishwas


  1.  You could use '-yt' to ship specified files to the class path, please 
refer to [1] for more details.
  2.  If the properties are only loaded on client side before executing the 
application, you could let your application to just read from local property 
data. Flink support to load properties within the ParameterTool [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html#usage
[2] 
https://github.com/apache/flink/blob/f1721293b0701d584d42bd68671181e332d2ad04/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java#L120

Best
Yun Tang


From: Vishwas Siravara 
Sent: Saturday, June 29, 2019 0:43
To: user
Subject: Providing external files to flink classpath

Hi ,
I am trying to add external property files to the flink classpath for
my application. These files are not a part of the fat jar. I put them
under the lib folder but flink cant find them? How can I manage
external property files that needs to be read by flink ?

Thanks,
Vishwas


Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread Yun Tang
你好

因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 
了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。

如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。

[1] https://issues.apache.org/jira/browse/FLINK-11266
[2] https://flink.apache.org/downloads.html

祝好
唐云



From: USERNAME 
Sent: Friday, June 28, 2019 15:41
To: user-zh@flink.apache.org
Subject: Re:Flink1.8+Hadoop3.1.2 编译问题

修正图片内容



在 2019-06-28 15:26:57,"USERNAME"  写道:

1.软件版本
Flink 1.8
Hadoop 3.1.2
Apache Maven 3.0.5


2.操作方式
>git clone -b release-1.8.0 https://github.com/apache/flink
>cd flink
>mvn clean install -DskipTests -Dhadoop.version=3.1.2


3.问题
编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
-rw-r--r-- 1 flink flink 96049496 Jun 28 15:17 flink-dist_2.11-1.8.0.jar
-rw-rw-r-- 1 flink flink   489884 Jun 19 13:35 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9931 Jun 19 13:35 slf4j-log4j12-1.7.15.jar


正常的Flink1.7.2的编译结果(↓)
-rw-r--r-- 1 flink flink 93445603 Mar 27 22:46 flink-dist_2.11-1.7.2.jar
-rw-r--r-- 1 flink flink   141881 Mar 27 22:44 flink-python_2.11-1.7.2.jar
-rw-r--r-- 1 flink flink 53380671 Mar 27 22:19 
flink-shaded-hadoop2-uber-1.7.2.jar
-rw-rw-r-- 1 flink flink   489884 Mar 27 22:16 log4j-1.2.17.jar
-rw-rw-r-- 1 flink flink 9931 Mar 27 22:16 slf4j-log4j12-1.7.15.jar


有没有遇到过此问题的??











Re: checkpoint stage size的问题

2019-06-26 Thread Yun Tang
你好

这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state 
size变大有几个原因:

  1.  上游数据量增大。
  2.  window设置时间较长,尚未触发,导致window内积攒的数据比较大。
  3.  window的类型决定了所需要存储的state size较大。

可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state 
size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations

祝好
唐云

From: ReignsDYL <1945627...@qq.com>
Sent: Wednesday, June 26, 2019 14:22
To: user-zh@flink.apache.org
Subject: checkpoint stage size的问题

各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
If you are using processing time, one possible way is to track last registered 
in another ValueState. And you could call 
#deleteProcessingTimeTimer(time) when you register new timer and found previous 
timer which stored in ValueState has smaller timestamp(T1) than current time 
(T2). After delete that processing timer, T1 would not trigger any action. You 
could refer to [1] and its usage for similar ideas.


[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/CleanupState.java


From: Andrea Spina 
Sent: Tuesday, June 25, 2019 23:40
To: Yun Tang
Cc: user
Subject: Re: Process Function's timers "postponing"

Hi Yun, thank you for your answer. I'm not sure I got your point. My question 
is:
for the same key K, I process two records R1 at t1 and R2 at t2.
When I process R1, I set a timer to be triggered at T1 which is > t2
When I process R2, I set a timer to be triggered at T2 which is > T1, but in 
order to do that, I want to remove the previous timer T1 in order to "postpone" 
the triggering.

In other words, I would like for a single key to be active just one-timer and 
if a new timer is requested the old one should be deleted.

Thank you,

Il giorno mar 25 giu 2019 alle ore 17:31 Yun Tang 
mailto:myas...@live.com>> ha scritto:
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer 
would be deleted. When you register your timer into 'processingTimeTimersQueue' 
(where your timer stored) at [1], the 'SystemProcessingTimeService' would then 
schedule a runnable TriggerTask after the "postpone" delay at [2]. When the 
scheduled runnable is triggered, it would poll from the 
'processingTimeTimersQueue' [3] which means the timer would finally be removed. 
Hope this could help you.

Best
Yun Tang

[1] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] 
https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>


From: Andrea Spina 
mailto:andrea.sp...@radicalbit.io>>
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I 
would like to do is to "postpone" eventually registered timers for the given 
key: I would like to do it since I might process plenty of events in a row 
(think about it as a session) so that I will able to trigger the computation 
"just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the 
previous timer triggering time, which I guess is not possible for me since I 
use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; 
for instance, I don't understand what "Since Flink maintains only one timer per 
key and timestamp...". Does this imply that a new PT timer will automatically 
overwrite an eventual previously existing one?

Thank you for your precious help,

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


--
Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Process Function's timers "postponing"

2019-06-25 Thread Yun Tang
Hi Andrea

If my understanding is correct, you just want to know when the eventual timer 
would be deleted. When you register your timer into 'processingTimeTimersQueue' 
(where your timer stored) at [1], the 'SystemProcessingTimeService' would then 
schedule a runnable TriggerTask after the "postpone" delay at [2]. When the 
scheduled runnable is triggered, it would poll from the 
'processingTimeTimersQueue' [3] which means the timer would finally be removed. 
Hope this could help you.

Best
Yun Tang

[1] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L208
[2] 
https://github.com/apache/flink/blob/97d28761add07a1c3569254302a1705e8128f91c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L121
[3] 
https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237
<https://github.com/apache/flink/blob/adb7aab55feca41c4e4a2646ddc8bc272c022098/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L237>


From: Andrea Spina 
Sent: Tuesday, June 25, 2019 2:06
To: user
Subject: Process Function's timers "postponing"

Dear Community,
I am using Flink (processing-time) timers along with a Process Function. What I 
would like to do is to "postpone" eventually registered timers for the given 
key: I would like to do it since I might process plenty of events in a row 
(think about it as a session) so that I will able to trigger the computation 
"just after" this session somehow stops.

I wondered about deleting eventual existing timers but AFAIU I need to know the 
previous timer triggering time, which I guess is not possible for me since I 
use processing-time timers.

I read also [1] but I am not really able to understand if it comes handy to me; 
for instance, I don't understand what "Since Flink maintains only one timer per 
key and timestamp...". Does this imply that a new PT timer will automatically 
overwrite an eventual previously existing one?

Thank you for your precious help,

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#timer-coalescing
--
Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Building some specific modules in flink

2019-06-24 Thread Yun Tang
Hi Syed

You could use 'mvn clean package -pl :flink-streaming-java_2.11 -DskipTests 
-am' to build flink-streaming-java and flink-runtime modules. If the 'already 
built binary' means the flink-dist-*.jar package, the former mvn command would 
not update the dist jar package. As far as I know, a quick solution is using 
`jar uf` [1] command to update the dist jar package with your changed classes. 
Otherwise, you need to build flink-dist module from scratch.

[1] https://docs.oracle.com/javase/tutorial/deployment/jar/update.html

Best
Yun Tang


From: syed 
Sent: Tuesday, June 25, 2019 9:14
To: user@flink.apache.org
Subject: Building some specific modules in flink

Hi;
I am trying to modify some core functionalities of flink for my through
understanding about flink.  I already build the flink from source, now I am
looking to build only a few modules which I have modified. Is this possible,
or every time I have to build the flink in full (all modules). As it takes
me about 30-35 minutes to build the flink in full.

Specifically, I have modified some classes in *flink-streaming-java* and
*flink-runtime* modules. I am looking to build only these two modules and
integrate these into already build flink (all modules). I alrady tried using
–pl option using mvn, it installs these modules but changes are not updated
in already build binaries.
Please guide me how can I do this.
Kind regards;
syed




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Linkage Error RocksDB and flink-1.6.4

2019-06-23 Thread Yun Tang
Hi Andrea

Since I have not written Scala for a while, I wonder why you need to 
instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM 
side. As far as I can see, you could instantiate your on your TM side like code:


val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions): DBOptions =
 currentOptions.setIncreaseParallelism(properties.threadNo)

  override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions =
 
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}

You just need to serialize the properties via closure to TMs. Hope this could 
help you.

Best
Yun Tang

From: Andrea Spina 
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4

Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At 
the job Startp the following exception happens (it's recorded by the Job 
Manager).

Caused by: java.lang.LinkageError: loader constraint violation: loader 
(instance of 
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
 previously initiated loading for a different type with name 
"org/rocksdb/DBOptions"
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)

For this job, I programmatically set some RocksDB options by using the code 
appended below. Anybody can help with this? Thank you so much,
Andrea

import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}

object ConfigurableRocksDB {

  lazy val columnOptions = new ColumnFamilyOptions() with Serializable
  lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
  lazy val dbOptions = new DBOptions() with Serializable

  def configureStateBackendRocksDB(properties: FlinkDeployment): 
RocksDBStateBackend = {
properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

properties.blockSize.foreach(bs => 
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
properties.cacheSize.foreach(cs => 
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
tableConfig.cacheIndexAndFilterBlocks())
properties.writeBufferSize.foreach(wbs => 
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
properties.writeBufferToMerge.foreach(bm => 
columnOptions.setMinWriteBufferNumberToMerge(bm))
properties.writeBufferCount.foreach(bc => 
columnOptions.setMaxWriteBufferNumber(bc))
properties.optimizeFilterForHits.foreach(op => if (op) 
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
  override def createDBOptions(currentOptions: DBOptions): DBOptions
 = dbOptions
  override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = columnOptions
}

val stateBE =
  new RocksDBStateBackend(properties.checkpointDir.get, 
properties.checkpointIncremental.getOrElse(false))
stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
stateBE.setOptions(rocksdbConfig)

stateBE
  }

}

--
Andrea Spina
Head of R @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT


Re: Checkpointing & File stream with

2019-06-18 Thread Yun Tang
Hi Sung

How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type when 
reading data from HDFS. FileProcessingMode.PROCESS_CONTINUOUSLY would 
periodically monitor the source while default FileProcessingMode.PROCESS_ONCE 
would only process once the data and exit.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources

Best
Yun Tang

From: Sung Gon Yi 
Sent: Tuesday, June 18, 2019 14:13
To: user@flink.apache.org
Subject: Checkpointing & File stream with

Hello,

I work on joining two streams, one is from Kafka and another is from a file 
(small size).
Stream processing works well, but checkpointing is failed with following 
message.
The file only has less than 100 lines and the pipeline related file reading is 
finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom File Source (1/1) of job 
d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.
——

Custom File Source is related following codes
——

DataStream specificationFileStream = env.readTextFile(specFile)

——

To perform checkpointing successfully, I write a code of custom source function 
to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon



Re: Apache Flink - Question about metric registry and reporter and context information

2019-06-15 Thread Yun Tang
Hi

1) Yes, the metrics reporter was instantiated per task manager, you could refer 
to [1] to confirm.

2) You could get your runtime context by calling #getRuntimeContext() in 
RichFunction. And you could get your metric group within runtimeContext then. 
The task manager name could be found by #getAllVariables() within MetricGroup.


[1] 
https://github.com/apache/flink/blob/8558548a37437ab4f8049b82eb07d1b3aa6ed1f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L139

Best
Yun Tang


From: M Singh 
Sent: Saturday, June 15, 2019 2:13
To: User
Subject: Apache Flink - Question about metric registry and reporter and context 
information

Hi:

I wanted to find if the metric reporter and registry are instantiated per task 
manager (which is a single JVM process) or per slot.  I believe it per task 
manager (JVM process) but just wanted to confirm.

Also, is there a way to access context information (eg: task manager name etc) 
in the metric reporter or registry just like in the rich function open method ?

Thanks


Re: What happens when: high-availability.storageDir: is not available?

2019-06-13 Thread Yun Tang
Except job graph and completed checkpoint, high availability storage directory 
would also store blob data which would be accessed from both jobmanager and 
taskmanager nodes, you could refer to [1] to view the BLOB storage architecture.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture

Best
Yun Tang

From: John Smith 
Sent: Thursday, June 13, 2019 21:18
To: Yun Tang
Cc: user
Subject: Re: What happens when: high-availability.storageDir: is not available?

Thanks. Does this folder need to available for task nodes as well? Or just job 
nodes?

On Wed., Jun. 12, 2019, 11:56 p.m. Yun Tang, 
mailto:myas...@live.com>> wrote:
High availability storage directory would store completed checkpoint and 
submitted job graph and completed checkpoint. If this directory is unavailable 
when initialization, job would be submitted well. If this directory is 
unavailable when creating checkpoints, that checkpoint would finally failed. 
The frequency of  this directory updated mainly depends on the checkpoint 
interval.

Best
Yun Tang

From: John Smith mailto:java.dev@gmail.com>>
Sent: Monday, June 10, 2019 23:55
To: user
Subject: Re: What happens when: high-availability.storageDir: is not available?

Or even how often do the master write there?

On Fri, 7 Jun 2019 at 16:16, John Smith 
mailto:java.dev@gmail.com>> wrote:
In HA setup what would happen if high-availability.storageDir: was unavailable?


Re: java.lang.NoClassDefFoundError --- FlinkKafkaConsumer

2019-06-13 Thread Yun Tang
Hi Syed

Have you ever build your user application jar package with all dependencies as 
[1] suggested or copy related connector jar package from flink-home/opt/ folder 
to flink-home/lib/ folder?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/projectsetup/dependencies.html#appendix-template-for-building-a-jar-with-dependencies

Best
Yun Tang

From: syed 
Sent: Thursday, June 13, 2019 16:28
To: user@flink.apache.org
Subject: java.lang.NoClassDefFoundError --- FlinkKafkaConsumer

hi
I am trying to add a kafka source to the standard WordCount application
available with flink, So that the application may count words from the kafka
source. I added the source like this

 *DataStream text;

if(params.has("topic")&("bootstrap.servers")&("zookeeper.connect")
 &("group.id")) {
 text = env.addSource(new
FlinkKafkaConsumer(params.get("topic"), new SimpleStringSchema(),
params.getProperties())
 .setStartFromEarliest());
 }
else if (params.has("input")) {
 // read the text file from given input path
 text = env.readTextFile(params.get("input"));
 } else {
 System.out.println("Executing WordCount example with default input data
set.");
 System.out.println("Use --input to specify file input.");
 // get default test text data
 text = env.fromElements(WordCountData.WORDS);
 }*

When running the application I got the following exception

java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer
at WordCount.main(WordCount.java:79)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 15 more

Please guide me how to fix this exception.
Kind Regards;
Syed



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: What happens when: high-availability.storageDir: is not available?

2019-06-12 Thread Yun Tang
High availability storage directory would store completed checkpoint and 
submitted job graph and completed checkpoint. If this directory is unavailable 
when initialization, job would be submitted well. If this directory is 
unavailable when creating checkpoints, that checkpoint would finally failed. 
The frequency of  this directory updated mainly depends on the checkpoint 
interval.

Best
Yun Tang

From: John Smith 
Sent: Monday, June 10, 2019 23:55
To: user
Subject: Re: What happens when: high-availability.storageDir: is not available?

Or even how often do the master write there?

On Fri, 7 Jun 2019 at 16:16, John Smith 
mailto:java.dev@gmail.com>> wrote:
In HA setup what would happen if high-availability.storageDir: was unavailable?


Re: Savepoint status check fails with error Operation not found under key

2019-06-12 Thread Yun Tang
Hi Anaray

Did you use /jobs/:jobid/savepoints/744e6b6488212b80deab51486620e348 to query 
the savepoint status and will Flink always return the same result to you when 
you query it later again? What's more, have you ever checked the web UI to see 
whether that savepoint ever triggered. If possible, you could search job 
manager log during that time when you trigger the savepoint and share it here.

Best
Yun Tang

From: anaray 
Sent: Wednesday, June 12, 2019 5:35
To: user@flink.apache.org
Subject: Savepoint status check fails with error Operation not found under key

Hi,

I am using flink 1.7.0 and checking the status of the savepoint fails with
error

{
"errors": [
"Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@57b9711e"
]
}


I started a savepoint using /jobs/:jobid/savepoints REST api, which returned
me as trigger-id
request-id": "744e6b6488212b80deab51486620e348", but when I called
/jobs/:jobid/savepoints/:triggerid using above request-id, it failed with
"Operation not found under key"

I referred 1.7.0 documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints-triggerid

Please let me know, if you have seen this issue before?

Thanks,
anaray



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [External] Flink 1.7.1 on EMR metrics

2019-05-30 Thread Yun Tang
Hi Padarn

If you want to verify why no metrics sending out, how about using the built-in 
Slf4j reporter [1] which would record metrics in logs.
If you could view the metrics after enabled slf4j-reporter, you could then 
compare the configurations.

Best
Yun Tang

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter


From: Padarn Wilson 
Sent: Thursday, May 30, 2019 18:20
To: user
Subject: [External] Flink 1.7.1 on EMR metrics

Hello all,

I am trying to run Flink 1.7.1 on EMR and having some trouble with metric 
reporting.

I was using the DataDogHttpReporter, but have also tried the StatsDReporter, 
but with both was seeing no metrics being collected.

To debug this I implemented my own reporter (based on StatsDReporter) and 
logged the name of the metric being sent:


private void send(final String name, final String value) {
   log.info("STATSD SENDING: ", name, value);
   try {
  String formatted = String.format("%s:%s|g", name, value);
  byte[] data = formatted.getBytes(StandardCharsets.UTF_8);
  socket.send(new DatagramPacket(data, data.length, this.address));
   }
   catch (IOException e) {
  LOG.error("unable to send packet to statsd at '{}:{}'", 
address.getHostName(), address.getPort());
   }
}


This code is certainly reached, because in my log I see a lot of this:

2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,351 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:
2019-05-30 10:18:40,352 INFO  com.grab.statsd.StatsDReporter
- STATSD SENDING:

As you can see, the name and value for the metric being reported is empty.


And the logs show everything initialized fine with no error:

2019-05-30 10:18:30,342 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Configuring 
stsd with {port=8125, host=127.0.0.1, class=com.grab.statsd.StatsDReporter}.
2019-05-30 10:18:30,344 INFO  com.grab.statsd.StatsDReporter
- Configured StatsDReporter with {host:127.0.0.1, port:8125}
2019-05-30 10:18:30,344 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Periodically 
reporting metrics in intervals of 10 SECONDS for reporter stsd of type 
com.grab.statsd.StatsDReporter.




Has anyone else  tried to work with Flink and metrics on EMR 1.7.1 (latest 
version on EMR). If so, any pointers as to what could be set up incorrectly?



Grab is hiring. Learn more at https://grab.careers<https://grab.careers/>

By communicating with Grab Inc and/or its subsidiaries, associate companies and 
jointly controlled entities (“Grab Group”), you are deemed to have consented to 
the processing of your personal data as set out in the Privacy Notice which can 
be viewed at https://grab.com/privacy/

This email contains confidential information and is only for the intended 
recipient(s). If you are not the intended recipient(s), please do not 
disseminate, distribute or copy this email Please notify Grab Group immediately 
if you have received this by mistake and delete this email from your system. 
Email transmission cannot be guaranteed to be secure or error-free as any 
information therein could be intercepted, corrupted, lost, destroyed, delayed 
or incomplete, or contain viruses. Grab Group do not accept liability for any 
errors or omissions in the contents of this email arises as a result of email 
transmission. All intellectual property rights in this email and attachments 
therein shall remain vested in Grab Group, unless otherwise provided by law.


Re: flink metrics的 Reporter 问题

2019-05-15 Thread Yun Tang
Hi 嘉诚

不清楚你使用的Flink具体版本,不过这个显示host-name第一部分的逻辑是一直存在的,因为大部分场景下host-name只需要取第一部分即可表征。具体实现代码可以参阅
 [1] 和 [2] 。

受到你的启发,我创建了一个JIRA [3] 来追踪这个问题,解法是提供一个metrics 
options,使得你们场景下可以展示metrics的完整hostname

祝好
唐云


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L365
[2] 
https://github.com/apache/flink/blob/505b54c182867ccac5d1724d72f4085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59
[3] https://issues.apache.org/jira/browse/FLINK-12520

From: 戴嘉诚 
Sent: Wednesday, May 15, 2019 20:24
To: user-zh@flink.apache.org
Subject: flink metrics的 Reporter 问题

大家好:
我按照官网的文档,调试了flink metrics 的 reporter 
,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题,
在taskManager中打印里面的信息的时候,打印出来的是:
ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded:
 12044
这里的格式范围,我看了源码应该是.taskmanager..:

但是这里就存在了个问题了,这里的host,显示的是ambari,我服务器上配置的计算机名称应该是全量的ambari.host12.yy,这里的host把后面的给全部省略掉了。这样,我就无法判断这条记录是来自哪个机器了。

同时,我在jobManager中看到jobmanager打印出来的日志中,是一个全量的机器名称,如下:
ambari.host02.yy.jobmanager.Status.JVM.Memory.NonHeap.Max: -1

请问如果我要在taskmanager的reporter中获取到全量的机器名称,我这里需要如何处理?这里是一个bug吗?还是我的使用有误


Re: Flink and Prometheus setup in K8s

2019-05-14 Thread Yun Tang
Hi Wouter

I have no idea of question-2. But for question-1, you could try to add your 
steps which already included in your 
https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile 's 
"RUN" phase to your k8s deployment-yaml's "command" phase before launch the 
cluster in k8s.

Best
Yun Tang


From: Wouter Zorgdrager 
Sent: Monday, May 13, 2019 20:16
To: user
Subject: Flink and Prometheus setup in K8s

Hey all,

I'm working on a deployment setup with Flink and Prometheus on Kubernetes. I'm 
running into the following issues:

1) Is it possible to use the default Flink Docker image [1] and enable the 
Prometheus reporter? Modifying the flink-config.yaml is easy, but somehow the 
Prometheus reporter jar needs to be moved within the image. This is easy if use 
my own Dockerfile (as done here [2]) , but I prefer using the official one.
2) I can define the jobmanager/taskmanager metric endpoints statically, but 
w.r.t. scaling I prefer to have these resolved/discovered dynamically. Did 
anyone get a working setup on this? I came across this resource for YARN [3], 
is there something similar for Kubernetes? Or are there any other ways of 
configuring Prometheus to pick this up automatically?

Thanks a lot for your help!

Kind regards,
Wouter

[1]: https://hub.docker.com/_/flink/
[2]: https://github.com/mbode/flink-prometheus-example/blob/master/Dockerfile
[3]: https://github.com/eastcirclek/flink-service-discovery


Re: Rocksdb作为状态后端启动时报错

2019-05-10 Thread Yun Tang
Hi

Root cause其实是最后一行 ”Caused by: java.lang.NoClassDefFoundError: Could not 
initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants“ 
与rocksDB无关,检查一下运行时classpath里面有没有这个类,可以先确认一下flink-shaded-hadoop2-xx.jar 
在不在你的classpath里面。

祝好
唐云

From: zhang yue 
Sent: Friday, May 10, 2019 14:43
To: user-zh@flink.apache.org
Subject: Rocksdb作为状态后端启动时报错

flink 版本1.7.2


org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
447c14f1bd0382214a420122215f6792)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at feature.flinktask.ActionLog.main(ActionLog.java:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was 
cancelled.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
... 20 more
Caused by: AsynchronousException{java.lang.Exception: Could not materialize 
checkpoint 11 for operator Source: Custom Source (1/1).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 11 for 
operator Source: Custom Source (1/1).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs://cdh-master2:8020/flink/checkpoints/447c14f1bd0382214a420122215f6792/chk-11/ca615aca-79a0-4af1-b0c1-954585c4c533
 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://cdh-master2:8020/flink/checkpoints/447c14f1bd0382214a420122215f6792/chk-11/ca615aca-79a0-4af1-b0c1-954585c4c533
 in order to obtain the stream state handle
at 

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-06 Thread Yun Tang
Hi Averell

Would you please share the Flink web graph UI to illustrate the change after 
you append a map operator?

Best
Yun Tang

From: Le-Van Huyen 
Sent: Monday, May 6, 2019 11:15
To: Yun Tang
Cc: user@flink.apache.org
Subject: Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Thank you Yun.

I haven't tried to follow your guide to check (would take some time for me to 
follow on how to do).
However, I could now confirm that the "union" is the culprit. In my Flink 
Console GUI, I can see that the link from StreamC to CEP via "union" is a 
FORWARD link, not a HASH one, which means that having "keyBy" right before the 
"union" has no effect at all. If I put a placebo "map" between "keyBy" on 
streamC and "union" then the problem is solved (.union(streamC.keyBy(r => 
(r.id1, r.id2)).map(r => r)))

I don't know why "union" is behaving like that though. Could not find that 
mentioned in any document.

Thanks a lot for your help.

Regards,
Averell


On Sun, May 5, 2019 at 11:22 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Averell

I think this is because after 'union', the input stream actually did not follow 
the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy 
would partition the data [1]. An easy way to verify this is refer to [2] to 
filter whether different sub-task of union stream contains exactly what down 
stream task conatains.

Best
Yun Tang


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] 
https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223


From: Averell mailto:lvhu...@gmail.com>>
Sent: Sunday, May 5, 2019 16:43
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/   val cepInput = streamA.keyBy(r => (r.id1, r.id2))
.connect(streamB.keyBy(r => (r.id1, r.id2)))
.flatMap(new MyCandidateFilterFunction())
.union(streamC.keyBy(r => (r.id1, r.id2)))

val cepOutput =
MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r 
=> (r.id1,
r.id2)),
counter1, counter2,
threshold1, threshold2)

object MyCEP {
def apply(input: KeyedStream[Event, _],
  longPeriod: Int,
  threshold: Int,
  shortPeriod: Int): DataStream[Event] = {

val patternLineIsUp = Pattern.begin[Event]("period1")
.where((value: event, ctx: 
CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
.times(longPeriod - 
shortPeriod).consecutive()
  .next("period2")
.where((value: Event, ctx: 
CepContext[Event]) =>
accSum(_.counter, 
Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
.times(shortPeriod).consecutive()

collectPattern(input, patternLineIsUp)
}

private def accSum(f: Event => Long, keys: Seq[String], 
currentEvent:
Event, ctx: CepContext[Event]): Long = {
keys.map(key => 
ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
}

private def collectPattern(inputStream: KeyedStream[Event, _], 
pattern:
Pattern[Event, Event]): DataStream[Event] =
CEP.pattern(inputStream, pattern)
.process((map: util.Map[String, 
util.List[Event]], ctx:
PatternProcessFunction.Context, co

Re: IllegalArgumentException with CEP & reinterpretAsKeyedStream

2019-05-05 Thread Yun Tang
Hi Averell

I think this is because after 'union', the input stream actually did not follow 
the rule that key must be pre-partitioned in EXACTLY the same way Flink’s keyBy 
would partition the data [1]. An easy way to verify this is refer to [2] to 
filter whether different sub-task of union stream contains exactly what down 
stream task conatains.

Best
Yun Tang


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] 
https://github.com/apache/flink/blob/4e505c67542a45c82c763c12099bdfc621c9e476/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java#L223


From: Averell 
Sent: Sunday, May 5, 2019 16:43
To: user@flink.apache.org
Subject: IllegalArgumentException with CEP & reinterpretAsKeyedStream

Hi everyone,

I have a big stream A, filtered by flags from a small stream B, then unioned
with another stream C to become the input for my CEP.
As the three streams A, B, C are all keyed, I expected that the output
stream resulting from connecting/unioning them would also be keyed, thus I
used /reinterpretAsKeyedStream/ before putting it into CEP. And with this, I
got the error /IllegalArgumentException/ (full stack-trace below).
If I use parallelism of 1, or if I don't use reinterpretAsKeyedStream (and
use /keyBy/ manually), then there's no such exception.

I don't know how to debug this error, and not sure whether I should use
keyed streams with CEP?

Thanks and best regards,
Averell


My code:
/   val cepInput = streamA.keyBy(r => (r.id1, r.id2))
.connect(streamB.keyBy(r => (r.id1, r.id2)))
.flatMap(new MyCandidateFilterFunction())
.union(streamC.keyBy(r => (r.id1, r.id2)))

val cepOutput =
MyCEP(new DataStreamUtils(cepInput).reinterpretAsKeyedStream(r 
=> (r.id1,
r.id2)),
counter1, counter2,
threshold1, threshold2)

object MyCEP {
def apply(input: KeyedStream[Event, _],
  longPeriod: Int,
  threshold: Int,
  shortPeriod: Int): DataStream[Event] = {

val patternLineIsUp = Pattern.begin[Event]("period1")
.where((value: event, ctx: 
CepContext[Event]) => accSum(_.counter,
Seq("period1"), value, ctx) < threshold)
.times(longPeriod - 
shortPeriod).consecutive()
  .next("period2")
.where((value: Event, ctx: 
CepContext[Event]) =>
accSum(_.counter, 
Seq("period1", "period2"), value, ctx) < threshold
&& value.status == "up")
.times(shortPeriod).consecutive()

collectPattern(input, patternLineIsUp)
}

private def accSum(f: Event => Long, keys: Seq[String], 
currentEvent:
Event, ctx: CepContext[Event]): Long = {
keys.map(key => 
ctx.getEventsForPattern(key).map(f).sum).sum +
f(currentEvent)
}

private def collectPattern(inputStream: KeyedStream[Event, _], 
pattern:
Pattern[Event, Event]): DataStream[Event] =
CEP.pattern(inputStream, pattern)
.process((map: util.Map[String, 
util.List[Event]], ctx:
PatternProcessFunction.Context, collector: Collector[Event]) => {
val records = map.get("period2")

collector.collect(records.get(records.size() - 1))
})
}/

The exception:
/Exception in thread "main" 12:43:13,103 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Stopped Akka
RPC service.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.mycompany.StreamingJob$.main(Streaming.scala:440)
at com.mycompany.StreamingJob.main(Streaming.scala)
Caused by: java.lang.IllegalArgumentException
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)

Re:[State Backend] 请教个问题,checkpoint恢复失败

2019-05-02 Thread Yun Tang
Hi
错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么?

祝好
唐云



发自我的小米手机
在 eric ,2019年4月30日 16:30写道:

大家好:


刚接触flink, 跑了个测试state checkpoint的程序:
1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会
2) 然后关闭数据源的socket,这时job会进入failed状态
3) 停几秒,把数据源socket重新打开
4) 这时flink会重连socket, 对job进行恢复,恢复时出错了, 存储的MapState没有成功恢复


环境:
flink: 1.8.0
flink的hadoop包:flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
hdfs文件系统: hadoop2.6.0-cdh5.16.1
运行在standalone模式, state backend选fssystem或rocksdb都没成功



出错的log:


Caused by: java.io.IOException: Stream closed
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
at 
java.io.FilterInputStream.read(FilterInputStream.java:83)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
at 
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
at 
org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
at 
java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:769)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)


Re: 使用hdfs保存checkpoint一段时间后报错

2019-04-30 Thread Yun Tang
Hi 志鹏

核心原因是HDFS的问题
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 could only be replicated to 0 nodes instead of minReplication (=1).  There are 
3 datanode(s) running and no node(s) are excluded in this operation.

在出现问题的时候,观察一下集群HDFS的情况,以及相关的日志。
也许这个stackoverflow的回答[1] 能帮助到你。


[1] 
https://stackoverflow.com/questions/36015864/hadoop-be-replicated-to-0-nodes-instead-of-minreplication-1-there-are-1/36310025

祝好
唐云



From: 邵志鹏 
Sent: Tuesday, April 30, 2019 15:26
To: user-zh@flink.apache.org
Subject: 使用hdfs保存checkpoint一段时间后报错

使用hdfs保存checkpoint一段时间后报错,自动重启后正常运行一段时间后继续报同样的错

Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://master:9000/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at 
org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:765)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.callInternal(HeapKeyedStateBackend.java:724)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File 
/flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff
 could only be replicated to 0 nodes instead of minReplication (=1).  There are 
3 datanode(s) running and no node(s) are excluded in this operation.
at 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1726)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:265)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2567)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:829)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2489)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1489)
at org.apache.hadoop.ipc.Client.call(Client.java:1435)
at org.apache.hadoop.ipc.Client.call(Client.java:1345)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:444)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
at 

Re: How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 default environment

2019-04-27 Thread Yun Tang
Hi Zhangjun

Thanks for your reply!

However, Flink user mailing list is tracked in English and user-zh mailing list 
is specific for Chinese. Reply in Chinese in flink user mailing list would be 
somehow unfriendly for those non-Chinese speakers.

I think your reply could be translated as official requirements.[1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/projectsetup/java_api_quickstart.html#requirements


From: 126 
Sent: Sunday, April 28, 2019 8:24
To: 胡逸才
Cc: imj...@gmail.com; dev; user
Subject: Re: How to let Flink 1.7.X run Flink session cluster on YARN in Java 7 
default environment

Flink源码中用到了很多java1.8的特性,所以用jdk1.7是不行的

发自我的 iPhone

在 2019年4月26日,17:48,胡逸才 mailto:huyc...@163.com>> 写道:

At present, all YARN clusters adopt JAVA 7 environment.

While trying to use FLINK to handle the deployment of flow processing business 
scenarios, it was found that FLINK ON YARN mode always failed to perform a 
session task. The application log of YARN shows Unsupported major. minor 
version 52.0.

I tried to add env. java. home: < JDK 1.8PATH > in flink-conf. yaml of the 
mailing list solution. And the startup command adds -yD yarn. taskmanager. env. 
JAVA_HOME= < JDK1.8PATH>、-yD containerized. master. env. JAVA_HOME= < 
JDK1.8PATH>, -yD containerized. taskmanager. env. JAVA_HOME= < JDK1.8PATH>. 
Flink session cluster in YARN can not run Application in JAVA 8 environment.

So can I use Flink1.7.X submit Flink session cluster application in YARN under 
JAVA 7 environment?






Re: Hbase Connector failed when deployed to yarn

2019-04-11 Thread Yun Tang
Hi

I believe this is the same problem which reported in 
https://issues.apache.org/jira/browse/FLINK-12163 , current work around 
solution is to put flink-hadoop-compatibility jar under FLINK_HOME/lib.

Best
Yun Tang

From: hai 
Sent: Thursday, April 11, 2019 21:06
To: user
Subject: Re: Hbase Connector failed when deployed to yarn


And my pom.xml dependencies is :








org.scala-lang

scala-library

${scala.version}





org.scala-lang

scala-compiler

${scala.version}








org.slf4j

slf4j-api

1.7.21








ch.qos.logback

logback-core

1.1.1





ch.qos.logback

logback-classic

1.1.1







org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

compile





org.apache.flink


flink-streaming-scala_${scala.binary.version}

${flink.version}

compile





org.apache.flink

flink-runtime-web_${scala.binary.version}

${flink.version}






org.apache.flink

flink-hbase_${scala.binary.version}

${flink.version}





org.apache.flink


flink-hadoop-compatibility_${scala.binary.version}

${flink.version}





org.apache.hadoop

hadoop-mapreduce-client-core

${hadoop.version}





cglib

cglib

2.2.2







org.apache.hadoop

hadoop-common

${hadoop.version}





 Original Message
Sender: hai
Recipient: user@flink.apache.org
Date: Thursday, Apr 11, 2019 21:04
Subject: Hbase Connector failed when deployed to yarn


Hello:

I am new to flink, and I copy the official Hbase connector examples from 
source

flink<https://github.com/apache/flink>/flink-connectors<https://github.com/apache/flink/tree/master/flink-connectors>/flink-hbase<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase>/src<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src>/test<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test>/java<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java>/org<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org>/apache<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org/apache>/flink<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink>/addons<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons>/hbase<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase>/example<https://github.com/apache/flink/tree/master/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example>/HBaseWriteExample.java
and run in a yarn-cluster with the command:

bin/flink run -m yarn-cluster -yn 2 -c {class-path-prefix}.HBaseWriteExample 
{my-application}.jar

What I have get is:


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
Caused by: java.

Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

Flink could support to report its metrics to external system such as 
Prometheus, Graphite and so on [1]. And you could then use web front end such 
as Grafana to query those system. Take `numBytesInLocalPerSecond` metrics for 
example, it would have many metrics tags and one of them is `tm_id` (task 
manager id). And if you group this metrics by `tm_id` to a specific task 
manager node, you would view received bytes from local at that task manager.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#reporter

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Wednesday, April 3, 2019 0:21
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thank you for the advice, but how would you suggest doing it to get the metrics 
also for each TaskManager?
I do not urgently need to use REST because I’m running my code within Flink. 
Maybe there is another way to access it?

Thanks a lot.

Benjamin
Am 2. Apr. 2019, 18:26 +0200 schrieb Yun Tang :
Hi Benjamin

Try this
http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond

You could GET 
http://localhost:8081/jobs/<http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond>
 to know running jobs,  and GET 
http://localhost:8081/jobs/{job-id}/vertices/<http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond>
 to know all vertices similarly.

However, AFAIK, if you use REST API to query I'm afraid you cannot directly 
know the received records per task manager, and you have to gather these 
metrics per task.

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 21:56
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thanks for the hint. I tried to access the metric through the REST API calling 
http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
 numBytesInRemotePerSecond.

Unfortunately the metric is not available...

Only these are avaiblable:
[{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]


How do I enable it, maybe in the flink-conf?

Thanks.

Benjamin
Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

Try this
http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond

You could GET 
http://localhost:8081/jobs/<http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond>
 to know running jobs,  and GET 
http://localhost:8081/jobs/{job-id}/vertices/<http://localhost:8081/jobs/{job-id}/vertices/{vertices-id}/subtasks/{subtask-index}/metrics?get=numBytesInLocalPerSecond>
 to know all vertices similarly.

However, AFAIK, if you use REST API to query I'm afraid you cannot directly 
know the received records per task manager, and you have to gather these 
metrics per task.

Best
Yun Tang

From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 21:56
To: user@flink.apache.org; Yun Tang
Subject: Re: Metrics for received records per TaskManager

Hi Yun,

thanks for the hint. I tried to access the metric through the REST API calling 
http://localhost:8081/taskmanagers/2264f296385854f2d1fb4d121495822a/metrics?get=
 numBytesInRemotePerSecond.

Unfortunately the metric is not available...

Only these are avaiblable:
[{"id":"Status.Network.AvailableMemorySegments"},{"id":"Status.JVM.Memory.NonHeap.Committed"},{"id":"Status.JVM.Memory.Mapped.TotalCapacity"},{"id":"Status.JVM.Memory.NonHeap.Used"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Count"},{"id":"Status.Network.TotalMemorySegments"},{"id":"Status.JVM.Memory.Direct.MemoryUsed"},{"id":"Status.JVM.Memory.Mapped.MemoryUsed"},{"id":"Status.JVM.CPU.Time"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Count"},{"id":"Status.JVM.Threads.Count"},{"id":"Status.JVM.GarbageCollector.G1_Old_Generation.Time"},{"id":"Status.JVM.Memory.Direct.TotalCapacity"},{"id":"Status.JVM.Memory.Heap.Committed"},{"id":"Status.JVM.ClassLoader.ClassesLoaded"},{"id":"Status.JVM.Memory.Mapped.Count"},{"id":"Status.JVM.Memory.Direct.Count"},{"id":"Status.JVM.CPU.Load"},{"id":"Status.JVM.Memory.Heap.Used"},{"id":"Status.JVM.Memory.Heap.Max"},{"id":"Status.JVM.ClassLoader.ClassesUnloaded"},{"id":"Status.JVM.GarbageCollector.G1_Young_Generation.Time"},{„id“:“Status.JVM.Memory.NonHeap.Max“}]


How do I enable it, maybe in the flink-conf?

Thanks.

Benjamin
Am 2. Apr. 2019, 10:37 +0200 schrieb Yun Tang :
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Re: Metrics for received records per TaskManager

2019-04-02 Thread Yun Tang
Hi Benjamin

I think 'numBytesInLocalPerSecond' and 'numBytesInRemotePerSecond' which 
indicate 'The number of bytes this task reads from a local source per second' 
and 'The number of bytes this task reads from a remote source per second' 
respectively could help you. If you want to track the information by each 
TaskManager, please group the metrics by tag 'tm_id'.

You could refer to 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
  for more information.

Best
Yun Tang


From: Benjamin Burkhardt 
Sent: Tuesday, April 2, 2019 15:00
To: user@flink.apache.org
Subject: Metrics for received records per TaskManager

Hi all,

I’m looking for a metric which allows me keeping track of the records or bytes 
each TaskManager has received or processed for the current task.

Can anyone help me getting this?

Thanks.

Benjamin


Re: flink ha hdfs目录权限问题

2019-04-01 Thread Yun Tang
怀疑你的HDFS有配置了默认用户hdfs,使得创建目录时,总会以hdfs的用户进行创建。检查一下YARN页面上运行Flink 
application的用户名,是不是root。最简单的workaround的方式就是按照[1] 里面描述的,配置环境变量 HADOOP_USER_NAME 
为 hdfs,这样你在用flink run命令行提交作业时以hdfs的用户名进行操作。

export HADOOP_USER_NAME=hdfs

[1] 
https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine


From: 孙森 
Sent: Monday, April 1, 2019 16:16
To: user-zh@flink.apache.org
Subject: Re: flink ha hdfs目录权限问题

修改目录权限对已有的文件是生效的,新生成的目录还是没有写权限。

[root@hdp1 ~]# hadoop fs -ls /flink/ha
Found 15 items
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/0e950900-c00e-4f24-a0bd-880ba9029a92
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/42e61028-e063-4257-864b-05f46e121a4e
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/58465b44-1d38-4f46-a450-edc06d2f625f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/61b6a5b8-1e11-4ac1-99e4-c4dce842aa38
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/931291f3-717c-4ccb-a622-0207037267a8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:14 
/flink/ha/application_1553766783203_0026
drwxr-xr-x   - hdfs hdfs  0 2019-04-01 16:13 
/flink/ha/application_1553766783203_0028
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/b2d16faa-ae2e-4130-81b8-56eddb9ef317
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bef09af0-6462-4c88-8998-d18f922054a1
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/bf486c37-ab44-49a1-bb66-45be4817773d
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/c07351fb-b2d8-4aec-801c-27a983ca3f32
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/d779d3a2-3ec8-4998-ae9c-9d93ffb7f265
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:12 
/flink/ha/dee74bc7-d450-4fb4-a9f2-4983d1f9949f
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/edd59fcf-8413-4ceb-92cf-8dcd637803f8
drwxrwxrwx   - hdfs hdfs  0 2019-04-01 15:13 
/flink/ha/f6329551-56fb-4c52-a028-51fd838c4af6

> 在 2019年4月1日,下午4:02,Yun Tang  写道:
>
> Hi 孙森,
>
> 将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2]
>  放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。
>
>
> [1] 
> https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine
> [2] 
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#chmod
>
> 祝好
> 唐云
>
> 发件人: 孙森
> 发送时间: 4月1日星期一 15:50
> 主题: flink ha hdfs目录权限问题
> 收件人: user-zh@flink.apache.org
>
>
> Hi all :
> 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
> cluster提交job时,会出现permission 
> denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
> cluster的时候,就会生成一个新的目录 
> ,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?
>
> 异常信息如下:
> The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't 
> retrieve Yarn cluster
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
>at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
>at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>at java.security.AccessController.doPrivileged(Native Method)
>at javax.security.auth.Subject.doAs(Subject.java:422)
>at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.hadoop.security.AccessControlException: Permission 
> denied: user=root, access=WRITE, 
> inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
>at 
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
> 

Re: flink ha hdfs目录权限问题

2019-04-01 Thread Yun Tang
Hi 孙森,

将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2]
 放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。


[1] 
https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine
[2] 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/FileSystemShell.html#chmod

祝好
唐云

发件人: 孙森
发送时间: 4月1日星期一 15:50
主题: flink ha hdfs目录权限问题
收件人: user-zh@flink.apache.org


Hi all :
 我使用flink on yarn 模式启动flink,并且配置了高可用。当向flink 
cluster提交job时,会出现permission 
denied的异常。原因是HDFS:///flink/ha下创建的文件夹的权限都是755,没有写权限。所以每启动一个新的flink 
cluster的时候,就会生成一个新的目录 
,比如:/flink/ha/application_1553766783203_0026。需要修改/flink/ha/application_1553766783203_0026的权限才能成功提交job。请问这个问题应该怎么解决呢?

异常信息如下:
 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve 
Yarn cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:409)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.retrieve(AbstractYarnClusterDescriptor.java:111)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:253)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.hadoop.security.AccessControlException: Permission 
denied: user=root, access=WRITE, 
inode="/flink/ha/application_1553766783203_0026/blob":hdfs:hdfs:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:353)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:325)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:246)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1950)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1934)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1917)
at 
org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:71)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4181)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1109)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:645)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)




Best!

Sen




Re: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

2019-03-26 Thread Yun Tang
Hi Konstantin

I think there is no direct relationship between registering 
chill-protobuf/chill-thrift for Protobuf/Thrift type with Kryo and enforcing 
POJO to use Kryo.
For both Protobuf and Thrift types, they will be extracted as GenericTypeInfo 
within TypeExtractor which would use Kryoserializer by default. If we do not 
register chill-protobuf and chill-thrift within kryo as [1] suggested, we would 
meet unexpected exceptions. However, PojoTypeInfo would only use KryoSerializer 
when we enableForceKryo(). They are actually two different aspects and use 
different fields within ExecutionConfig.

Best
Yun Tang

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html

From: Konstantin Knauf 
Sent: Tuesday, March 26, 2019 17:37
To: Stephan Ewen
Cc: Yun Tang; dev; user
Subject: Re: [DISCUSS] Remove forceAvro() and forceKryo() from the 
ExecutionConfig

Hi Stephan,

I am in favor of renaming forceKryo() instead of removing it, because users 
might plugin their Protobuf/Thrift serializers via Kryo as advertised in our 
documentation [1]. For this, Kryo needs to be used for POJO types as well, if I 
am not mistaken.

Cheers,

Konstantin

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html


On Tue, Mar 26, 2019 at 10:03 AM Stephan Ewen 
mailto:se...@apache.org>> wrote:
Compatibility is really important for checkpointed state.
For that, you can always directly specify GenericTypeInfo or AvroTypeInfo if 
you want to continue to treat a type via Kryo or Avro.

Alternatively, once https://issues.apache.org/jira/browse/FLINK-11917 is 
implemented, this should happen automatically.

On Tue, Mar 26, 2019 at 8:33 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Stephan

I prefer to remove 'enableForceKryo' since Kryo serializer does not work 
out-of-the-box well for schema evolution stories due to its mutable properties, 
and our built-in POJO serializer has already supported schema evolution.

On the other hand, what's the backward compatibility plan for enableForceAvro() 
and enableForceKryo()?  I think if 
https://issues.apache.org/jira/browse/FLINK-11917 merged, we could support to 
migrate state which was POJO but serialized using Kryo.

Best
Yun Tang

From: Stephan Ewen mailto:se...@apache.org>>
Sent: Tuesday, March 26, 2019 2:31
To: dev; user
Subject: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

Hi all!

The ExecutionConfig has some very old settings: forceAvro() and forceKryo(), 
which are actually misleadingly named. They cause POJOs to use Avro or Kryo 
rather than the POJO serializer.

I think we do not have a good case any more to use Avro for POJOs. POJOs that 
are also Avro types go through the Avro serializer anyways.

There may be a case to use Kryo for POJOs if you don't like the Flink POJO 
serializer.

I would suggest to remove the "forceAvro()" option completely.
For "forceKryo()", I am torn between removing it completely or renaming it to 
"setUseKryoForPOJOs()".

What are the opinion on that out there?

Best,
Stephan



--

Konstantin Knauf | Solutions Architect

+49 160 91394525

[https://lh4.googleusercontent.com/1RRzA12SK12Xaowkag-W37QDs5LHrfw4R0tMwVNjKLDKoIu69ld1qtA2hSDn1LSJe9w2THG1A9igK_nXPrNeIqRF87FjbEQoBnZJJgyPXCkKPFYuYc_Vh419P9EOO36ERgdnX5wG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: RocksDB中指定nameNode 的高可用

2019-03-26 Thread Yun Tang
Hi

Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。

祝好
唐云

From: 戴嘉诚 
Sent: Tuesday, March 26, 2019 16:57
To: user-zh@flink.apache.org
Subject: RocksDB中指定nameNode 的高可用

  嘿,我想询问一下,flink中的RocksDB位置  
我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active 
nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉



Re: What is Flinks primary API language?

2019-03-26 Thread Yun Tang
Hi Llya

I believe Java is the main implementation language of Flink internals, 
flink-core is the kernel module and implemented in Java.

What's more:
FILP6: Replace Scala implemented JobManager.scala and TaskManager.scala to new 
JobMaster.java and TaskExecutor.java
FILP32: Make flink-table Scala free.

But for API level, from my point of view, I have never heard any plan to stop 
supporting Scala.

Best
Yun Tang




From: Ilya Karpov 
Sent: Tuesday, March 26, 2019 15:21
To: user@flink.apache.org
Subject: What is Flinks primary API language?

Hello,
our dev-team is choosing a language for developing Flink jobs. Most likely that 
we will use flink-streaming api (at least in the very beginning). Because of 
Spark jobs developing experience we had before the choice for now is scala-api. 
However recently I’ve found a 
ticket(https://issues.apache.org/jira/browse/FLINK-11063) with 
discussion(https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals)
 where java-api can become the first class citizen to overcome shortcomings 
java/scala interoperability, and another 
one(https://issues.apache.org/jira/browse/FLINK-6756) about absence of api 
methods in scala-api. That makes me think that java-api becomes a prior. We are 
both comfortable with java and scala, but we want to implement Flink jobs and 
support utilities dependent on api that progresses faster. Is java-api such a 
one? Is java the main implementation language of Flink internals and api, and 
scala is a (or gravitates to be) java-api wrapper?


Re: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

2019-03-26 Thread Yun Tang
Hi Stephan

I prefer to remove 'enableForceKryo' since Kryo serializer does not work 
out-of-the-box well for schema evolution stories due to its mutable properties, 
and our built-in POJO serializer has already supported schema evolution.

On the other hand, what's the backward compatibility plan for enableForceAvro() 
and enableForceKryo()?  I think if 
https://issues.apache.org/jira/browse/FLINK-11917 merged, we could support to 
migrate state which was POJO but serialized using Kryo.

Best
Yun Tang

From: Stephan Ewen 
Sent: Tuesday, March 26, 2019 2:31
To: dev; user
Subject: [DISCUSS] Remove forceAvro() and forceKryo() from the ExecutionConfig

Hi all!

The ExecutionConfig has some very old settings: forceAvro() and forceKryo(), 
which are actually misleadingly named. They cause POJOs to use Avro or Kryo 
rather than the POJO serializer.

I think we do not have a good case any more to use Avro for POJOs. POJOs that 
are also Avro types go through the Avro serializer anyways.

There may be a case to use Kryo for POJOs if you don't like the Flink POJO 
serializer.

I would suggest to remove the "forceAvro()" option completely.
For "forceKryo()", I am torn between removing it completely or renaming it to 
"setUseKryoForPOJOs()".

What are the opinion on that out there?

Best,
Stephan



Re: Reserving Kafka offset in Flink after modifying app

2019-03-26 Thread Yun Tang
Hi Son

I think it might be because of not assigning operator ids to your Filter and 
Map functions, you could refer to [1] to assign ids to your application. 
Moreover, if you have ever removed some operators, please consider to add 
--allowNonRestoredState [2] option.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


From: Son Mai 
Sent: Tuesday, March 26, 2019 9:51
To: Konstantin Knauf
Cc: user
Subject: Re: Reserving Kafka offset in Flink after modifying app

Hi Konstantin,

Thanks for the response. What still concerned me is:

  1.  Am I able to recover from  checkpoints even if I change my program (for 
example, changing Filter and Map functions, data objects, ..) ? I was not able 
to recover from savepoints when I changed my program.

On Mon, Mar 25, 2019 at 3:19 PM Konstantin Knauf 
mailto:konstan...@ververica.com>> wrote:
Hi Son,

yes, this is possible, but your sink needs to play its part in Flink's 
checkpointing mechanism. Depending on the implementation of the sink you should 
either:

* implemented CheckpointedFunction and flush all records to BigQuery in 
snapshotState. This way in case of a failure/restart of the job, all records up 
to the last successful checkpoint will have been written to BigQuery and all 
other records will be replayed.
* use managed operator state to store all pending records in the sink. Thereby 
they will be be persisted in snapshotState. This way in  case of a 
failure/restart of the job, all records up to the last successful checkpoint, 
which have not been written to BigQuery, will be restored in the sink, all 
other records will be replayed.

In both cases, you might write the same record to the BigQuery twice.

If in doubt if your sink fulfills the criteria above, feel free to share it.

Cheers,

Konstantin



On Mon, Mar 25, 2019 at 7:50 AM Son Mai 
mailto:hongson1...@gmail.com>> wrote:
Hello,

I have a topic in Kafka that Flink reads from. I parse and write messages in 
this topic to BigQuery using streaming insert in batch of 500 messages using in 
CountWindow in Flink.

Problem: I want to commit manually only when a batch was written successfully 
to Bigquery.

Reason:
I saw that Flink KafkaConsumer does not use offset committing to Kafka but uses 
its own checkpointing. I don't know how Flink checkpointing works and I'm 
worried that Flink's checkpointing does not solve my following situation:
- let's say I have a Flink job running and processing a batch of 500 messages 
of Kafka offset 1000-1500.
- I stopped this job before it saves to BigQuery and makes some modifications 
to the program. Savepoints did not work when I tried because it required the 
operators code does not change.

What I want is when I start the modified app, it would start every time from 
offset 1000-1500 in Kafka because these messages have not been written to 
BigQuery.

Is there any way to achieve this in Flink?

Thanks,
SM


--

Konstantin Knauf | Solutions Architect

+49 160 91394525

[https://lh4.googleusercontent.com/1RRzA12SK12Xaowkag-W37QDs5LHrfw4R0tMwVNjKLDKoIu69ld1qtA2hSDn1LSJe9w2THG1A9igK_nXPrNeIqRF87FjbEQoBnZJJgyPXCkKPFYuYc_Vh419P9EOO36ERgdnX5wG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: flink 1.7.2集群异常退出

2019-03-12 Thread Yun Tang
Hi

你是不是没有配置checkpoint 
path,且没有显式的配置FsStateBackend或者RocksDBStateBackend,这应该是一个MemoryStateBackend 
在配置HA却没有配置checkpoint path时候的bug,参见我之前创建的JIRA 
https://issues.apache.org/jira/browse/FLINK-11107

相关PR已经提交了,不过社区认为MemoryStateBackend更多的是debug用 或者 
实验性质的toy,不会有生产环境直接使用,加之最近忙于release-1.8的发布,所以暂时还没有review代码。

祝好
唐云

From: ppp Yun 
Sent: Wednesday, March 13, 2019 10:24
To: user-zh
Subject: flink 1.7.2集群异常退出

Hi,ALL

 写了个测试程序,大概跑了不到三个小时,flink集群就挂了,所有节点退出,报错如下:

2019-03-12 20:45:14,623 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Tbox from 
Kafka Sink To Kafka And Print (21949294d4750b869b341c5d2942d499) switched from 
state RUNNING to FAILING.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException):
 The directory item limit of /tmp/ha is exceeded: limit=1048576 items=1048576


hdfs count结果:

20971514  124334563 hdfs://banma/tmp/ha


下面是flink-conf.yaml的配置:

[hdfs@qa-hdpdn06 flink-1.7.2]$ cat conf/flink-conf.yaml |grep ^[^#]
jobmanager.rpc.address: 10.4.11.252
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 10
parallelism.default: 1
 high-availability: zookeeper
 high-availability.storageDir: hdfs://banma/tmp/ha
 high-availability.zookeeper.quorum: qa-hdpdn05.ebanma.com:2181
rest.port: 8081

flink版本:官方最新的flink 1,7.2

为什么 high-availability.storageDir的目录会产生如此多的子目录?里面存的都是什么?什么情况下回触发这些存储操作?如何避免这个问题?

谢谢!


Re: estimate number of keys on rocks db

2019-03-10 Thread Yun Tang
Hi Avi

Unfortunately, we cannot see the attached images. By the way, did you ever use 
window in this job?

Best
Yun Tang

From: Avi Levi 
Sent: Sunday, March 10, 2019 19:41
To: user
Subject: estimate number of keys on rocks db

Hi,
I am trying to estimate number of keys at a given minute.
I created a graph based on 
avg_over_time<https://prometheus.io/docs/prometheus/latest/querying/functions/#aggregation_over_time>
 with 1hr and 5m interval. looking at the graph you can see that it has high 
spikes which doesn't make sense (IMO) how can the average have those spikes ? 
after all since I do not delete keys I would expect to go up or remain the same.
any ideas what can explain such behaviour ?
attached are graphs 5m and 1 h intervals
[Screen Shot 2019-03-10 at 13.37.44.png]
[Screen Shot 2019-03-10 at 13.33.40.png]





Re: Is taskmanager.heap.mb a valid configuration parameter in 1.7?

2019-03-07 Thread Yun Tang
Hi

Yes, `taskmanager.heap.mb` is deprecated but still useful to keep backward 
comparability.

I have already created an issue 
https://issues.apache.org/jira/browse/FLINK-11860 to move these deprecated 
options in documentation.

Best
Yun Tang



From: anaray 
Sent: Friday, March 8, 2019 13:23
To: user@flink.apache.org
Subject: Is taskmanager.heap.mb a valid configuration parameter in 1.7?

Hi,

I see a reference about  *taskmanager.heap.mb* in 1.7.1 config docs
(https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html).
I thought taskmanager.heap.mb got deprecated and new config is
taskmanager.heap.size. Please correct me if I am wrong here.

Thank you.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?

2019-03-06 Thread Yun Tang
Hi Jack

How about extracting flink-metrics-prometheus-1.6.1.jar from downloaded 
distribution tar https://archive.apache.org/dist/flink/flink-1.6.1/ and upload 
it to `/usr/lib/flink/lib` on EMR?

Otherwise, I believe setup a customized Flink cluster on EMR [1] should work if 
no other convenient solutions.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#custom-emr-installation


Best
Yun Tang

From: Jack Tuck 
Sent: Thursday, March 7, 2019 3:39
To: user@flink.apache.org
Subject: How to monitor Apache Flink in AWS EMR (ElasticMapReduce)?


I currently have Flink setup and have a Job running on EMR and I'm now trying 
to add monitoring by sending metrics off to prometheus.



I have come across an issue with running Flink on EMR. I'm using Terraform to 
provision EMR (I run ansible after to download and run a job).  Out the box, it 
does not look like EMR's Flink distribution includes the optional jars 
(flink-metrics-prometheus, flink-cep, etc).



Looking at Flink's documentation, it says

> "In order to use this reporter you must copy 
> `/opt/flink-metrics-prometheus-1.6.1.jar` into the `/lib` folder of your 
> Flink distribution"

https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter



But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink 
has a directory called `opts` and i can not see 
`flink-metrics-prometheus-1.6.1.jar` anywhere.



I know Flink has other optional libs you'd usually have to copy if you want to 
use them such as flink-cep, but I'm not sure how to do this when using EMR.



This is the exception i get, which I beleive is because it can not find the 
metrics jar in its classpath.

```

java.lang.ClassNotFoundException: 
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:264)

at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:144)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)

at 
org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

```



EMR resource in terraform

```resource "aws_emr_cluster" "emr_flink" {

  name  = "ce-emr-flink-arn"

  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing

  applications  = ["Flink"]



  ec2_attributes {

key_name  = "ce_test"

subnet_id = "${aws_subnet.ce_test_subnet_public.id}"

instance_profile  = 
"${aws_iam_instance_profile.emr_profile.arn}"

emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"

emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"

additional_master_security_groups  = 
"${aws_security_group.external_connectivity.id}"

additional_slave_security_groups  = 
"${aws_security_group.external_connectivity.id}"

  }



  ebs_root_volume_size = 100

  master_instance_type = "m4.xlarge"

  core_instance_type   = "m4.xlarge"

  core_instance_count  = 2



  service_role = "${aws_iam_role.iam_emr_service_role.arn}"



  configurations_json = <

<    1   2   3   4   5   6   >