Re: pyflink资源优化问题,请教

2021-04-05 Thread Dian Fu
处理逻辑看起来应该是没有问题的。

1)可以详细说一下,你说的数据延迟问题吗?现在的qps可以达到多少,预期是多少?
2)你现在用的哪种部署模式?
3)并发度的设置可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration
 

4)内存相关的配置的配置项可以看一下:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration
 
,设置方法,可以参考3)
5)至于并发度/内存设置成多少,这个完全取决于你的业务逻辑以及需要达到的qps,具体值需要测一下才知道设置成多少合适。


> 2021年4月6日 上午11:43,郭华威  写道:
> 
> hidden email
> 
> 在2021年04月06日 11:36,苗红宾 写道:
> 你好:
> 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。
> 
> 
> 现在的使用方式:
> 1、slide_window = 
> Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
> 2、使用sql语句注册kafka connector,
> 3、result table使用普通的print:
> CREATE TABLE sink (
> city_id STRING ,
> 
> start_time TIMESTAMP ,
> 
> end_time TIMESTAMP ,
> 
> flag   STRING
> 
> ) with (
>   'connector' = 'print'
> )
> 4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
> 5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
>  
> city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()
> 
> 
> 
> 
> 这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。
> 
> 
> 所以想请教一下:
> 1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
> 2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等
> 
> 
> 多谢



(无主题)

2021-04-05 Thread 郭华威
退订

回复:pyflink资源优化问题,请教

2021-04-05 Thread 郭华威
hidden email

在2021年04月06日 11:36,苗红宾 写道:
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = 
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
   'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
 
city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢

pyflink资源优化问题,请教

2021-04-05 Thread 苗红宾
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = 
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
 
city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢

??????????????????????????????????????????????????????

2021-04-05 Thread 83315898
??
1. ??KAFKA??hivehive??,format ?? 
Parquet,??
 
2.  Pipeline ?? Hive Table 
??java.lang.ArrayIndexOutOfBoundsException: -1
 ??flink sql client 
 1??select 
   ?? select *
from ubtCatalog.ubtHive.event_all_dwd 
/*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */
;

2) 
??1)java.lang.ArrayIndexOutOfBoundsException:
 -1
 ?? select count(xubtappid)
from ubtCatalog.ubtHive.event_all_dwd 
/*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */
;

??
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
  at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
  at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
  at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
  at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
  at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
  at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator 
bc764cd8ddf7a0cff126f51c16239658).
  at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
  at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
  at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
  at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to 
enumerate files
  at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
  at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
  at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
  ... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
  at 
org.apache.flink.connectors.hive.util.HivePartitionUtils.toHiveTablePartition(HivePartitionUtils.java:167)
  at 

Re: Flink启动后某个TM的某个slot不工作,看起来像是直接没任何通信。

2021-04-05 Thread yidan zhao
这个问题出现很多次了。目前还有一种case,不是启动的时候。
如果是启动的时候,则表现为watermark显示为没有,即无任何watermark。
另一种case是启动后正常运行,若干时间(可能几小时,也可能很多天)后,突然开始watermark无限停滞。导致无限反压。

yidan zhao  于2021年3月3日周三 下午2:47写道:

> 如题,日志:
> 2021-03-03 11:03:17,151 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:17,344 WARN org.apache.hadoop.util.NativeCodeLoader [] -
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable
>
> 2021-03-03 11:03:17,441 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:18,226 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> SASL configuration failed: javax.security.auth.login.LoginException: No
> JAAS configuration section named 'Client' was found in specified JAAS
> configuration file:
> '/home/work/antibotFlink/flink-1.12.0/tmp/jaas-1092430908919603833.conf'.
> Will continue connection to Zookeeper server without SASL authentication,
> if Zookeeper server allows it.
>
> 2021-03-03 11:03:18,227 ERROR
> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
> Authentication failed
>
> 2021-03-03 11:03:18,957 ERROR akka.remote.transport.netty.NettyTransport
> [] - failed to bind to /0.0.0.0:2027, shutting down Netty transport
> 2021-03-03 11:03:18,973 ERROR akka.remote.Remoting [] - Remoting system
> has been terminated abrubtly. Attempting to shut down transports
>
> 如上,有个关于端口的报错,有人知道原因吗?
> 问题直接表现和影响是,我某个source的task无任何输出(此处的无输出包括任何数据,bytes
> sent为0)。导致后续结点无watermark。进而反压永久=1(进而出现了一种之前就觉得很奇怪的场景:即反压到不工作,CPU都不再利用了。。。)。
>


Re: flink kubernetes application频繁重启TaskManager问题

2021-04-05 Thread Yang Wang
你的cpu设置这么小,K8s是严格限制的

我怀疑TM启动很慢,一直注册不上来超时导致失败了,你可以看看TM log确认一下

另外,从你发的这个log看,rest endpoint应该已经成功启动了,可以通过来进行访问

Best,
Yang

casel.chen  于2021年4月5日周一 上午10:05写道:

> 最近试用flink kubernetes
> application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web
> ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么?
>
>
> = 启动参数
> "kubernetes.jobmanager.cpu": "0.1",
> "kubernetes.taskmanager.cpu": "0.1",
> "taskmanager.numberOfTaskSlots": "1",
> "jobmanager.memory.process.size": "1024m",
> "taskmanager.memory.process.size": "1024m",
>
>
> = k8s日志
>
>
>
> 2021-04-05 09:55:14,777 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] -
> JobManager successfully registered at ResourceManager, leader id:
> 9903e058fb5ca6f418c78dafcad048f1.
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,870 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Requesting new slot [SlotRequestId{3bcf44c03f742d211b5abcc9d0d35068}] and
> profile ResourceProfile{UNKNOWN} with allocation id
> 17bcd11a1d493155e3ed45cfd200be79 from resource manager.
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
>  with allocation id
> 17bcd11a1d493155e3ed45cfd200be79.
> 2021-04-05 09:55:14,974 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:55:15,272 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:55:18,570 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> new TaskManager pod with name
> flink-k8s-native-application-cluster-taskmanager-1-1 and resource
> <1024,0.1>.
> 2021-04-05 09:55:22,669 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> flink-k8s-native-application-cluster-taskmanager-1-1 is created.
> 2021-04-05 09:55:22,670 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: flink-k8s-native-application-cluster-taskmanager-1-1
> 2021-04-05 09:55:22,770 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker flink-k8s-native-application-cluster-taskmanager-1-1 with
> resource spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb
> (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb
> (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 with resource
> spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb (26843542
> bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes),
> managedMemSize=230.400mb (241591914 bytes)} was requested in current
> attempt and has not registered. Current pending count after removing: 0.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 is terminated.
> Diagnostics: Pod terminated, container termination statuses:
> [flink-task-manager(exitCode=1, reason=Error, message=null)]
> 2021-04-05 09:56:35,495 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:56:35,496 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:56:35,498 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> 

Flink1.2对 key 进行分区,和 hash 分区有什么区别?

2021-04-05 Thread 刘文




Flink1.2对 key 进行分区,和 hash 分区有什么区别?
如: 分区数值 = key 的 hash值 % 并行度?


为什么不直接使用 hash 进行分区?


KeyGroupStreamPartitioner.java
|
|
@Override
public int selectChannel(SerializationDelegate> record) {
K key;
   try {
  key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + 
record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfChannels);
}

Re: [External] : Union of more then two streams

2021-04-05 Thread Fuyao Li
Hello BB,


  1.  For the datastream approach, you can use broadcast pattern to build state 
to enrich your data instead of join.
 *   You can define something like this,
Class CodebookData{
 private Currency currency;
 private OrganizationUnit organizationUnit;
 ...
}


 *   you can leverage Broadcast stream[1] as you mentioned your code book 
streams doesn’t have much data. This is a good use case for broadcast pattern. 
Connect the wrapper class datastream with the main stream and simply enrich it 
with the state you built. Not sure if this fits into your use case…. Please 
check.
  1.  I am not sure, lateral table join (temporal join) is designed to handle 
some data enrich work load. You have a main table, and probe side table… I 
suppose there is some kind of optimization, maybe I am wrong... In theory, it 
is still based on join, maybe you forget about this part. Anyway, Flink SQL 
will make join easier.


Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Best,
Fuyao

From: B.B. 
Date: Monday, April 5, 2021 at 06:27
To: Fuyao Li 
Subject: Re: [External] : Union of more then two streams
Hi Fuyao,
thanks for you input.
I have follow up questions regarding your advices.

In your DataStream suggested solution in a) case could you elaborate a little 
bit more. When you create that kind of generalized type how would you join it 
with main stream? Which key would you use.
I was thinking of creating wrapper class that inside will have all the data 
from code books. For example
Class CodebookData{
 private Currency currency;
 private OrganizationUnit organizationUnit
 ...
}
But then I have problem which key to use to join with main stream because 
currency has its own key currencyId and organization unit has also its key 
organizationId and so on.

Regarding your 2. suggested solution with Flink SQL what do you mean by
“ For such join, there should be some internal optimization and might get rid 
of some memory consumption issues”.

Thx in advance

BB


On Mon, 5 Apr 2021 at 07:29, Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hello BB,

Just want to share you some of my immature ideas. Maybe some experts can give 
you better solutions and advice.

  1.  DataStream based solution:

 *   To do a union, as you already know, you must have the datastream to be 
of the same format. Otherwise, you can’t do it. There is a work around way to 
solve you problem. You can ingest the datastream with deserializationSchema and 
map different code book streams to the same Java type, there is a field of 
foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), 
another field just contains the name of the foreign value (e.g. cookbook_fk1.) 
All other fields should also be generalized into such Java Type. After that, 
you can do a union for these different code book  streams and join with 
mainstream.
 *   For cascade connect streams, I guess it is not a suggested approach, 
in additional to memory, I think it will also make the watermark hard to 
coordinate.

  1.  Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. 
For such approach, you are cascade the join to enrich the mainstream. This 
seems to be fitting into your use case since your enrich stream doesn’t change 
so often and contains something like currency. For such join, there should be 
some internal optimization and might get rid of some memory consumption issues, 
I guess? Maybe I am wrong. But it worth to take a look.




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join

Best,
Fuyao



From: B.B. mailto:bijela.vr...@gmail.com>>
Date: Friday, April 2, 2021 at 01:41
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: [External] : Union of more then two streams
Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed, keyed state (so when compact events 

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
And actually, I've found that the correct version of the AWS SDK *is*
included in Flink 1.12, which was reported and fixed in FLINK-18676
(see[1]). Since you said you saw this also occur in 1.12, can you share
more details about what you saw there?

Best,
Austin

[1]: https://issues.apache.org/jira/browse/FLINK-18676

On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> That looks interesting! I've also found the full list of S3 properties[1]
> for the version of presto-hive bundled with Flink 1.12 (see [2]), which
> includes an option for a KMS key (hive.s3.kms-key-id).
>
> (also, adding back the user list)
>
> [1]:
> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>
> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>
>> Btw, there is also an option to provide a custom credential provider,
>> what are your thoughts on this?
>>
>> presto.s3.credentials-provider
>>
>>
>> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> I've confirmed that for the bundled + shaded aws dependency, the only
>>> way to upgrade it is to build a flink-s3-fs-presto jar with the updated
>>> dependency. Let me know if this is feasible for you, if the KMS key
>>> solution doesn't work.
>>>
>>> Best,
>>> Austin
>>>
>>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hi Swagat,

 I don't believe there is an explicit configuration option for the KMS
 key – please let me know if you're able to make that work!

 Best,
 Austin

 On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
 wrote:

> Hi Austin,
>
> Let me know what you think on my latest email, if the approach might
> work, or if it is already supported and I am not using the configurations
> properly.
>
> Thanks for your interest and support.
>
> Regards,
> Swagat
>
> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Swagat,
>>
>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>> aws-java-sdk-core with the Presto implementation (transitively from 
>> Presto
>> 0.185[1]).
>> The minimum support version for the ServiceAccount authentication
>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>> long
>> after Flink 1.6 was released. It looks like even the most recent Presto 
>> is
>> on a version below that, concretely 1.11.697 in the master branch[4], so 
>> I
>> don't think even upgrading Flink to 1.6+ will solve this though it looks 
>> to
>> me like the AWS dependency is managed better in more recent Flink 
>> versions.
>> I'll have more for you on that front tomorrow, after the Easter break.
>>
>> I think what you would have to do to make this authentication
>> approach work for Flink 1.6 is building a custom version of the
>> flink-s3-fs-presto jar, replacing the bundled AWS dependency with the
>> 1.11.704 version, and then shading it the same way.
>>
>> In the meantime, would you mind creating a JIRA ticket with this use
>> case? That'll give you the best insight into the status of fixing this :)
>>
>> Let me know if that makes sense,
>> Austin
>>
>> [1]:
>> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
>> [2]:
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
>> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
>> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>>
>> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra 
>> wrote:
>>
>>> Austin -
>>>
>>> In my case the set up is such that services are deployed on
>>> Kubernetes with Docker, running on EKS. There is also an istio service
>>> mesh. So all the services communicate and access AWS resources like S3
>>> using the service account. Service account is associated with IAM 
>>> roles. I
>>> have verified that the service account has access to S3, by running a
>>> program that connects to S3 to read a file also aws client when
>>> packaged into the pod is able to access S3. So that means the roles and
>>> policies are good.
>>>
>>> When I am running flink, I am following the same configuration for
>>> job manager and task manager as provided here:
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>>
>>> The exception we are getting is -
>>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
>>> Unable 

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
That looks interesting! I've also found the full list of S3 properties[1]
for the version of presto-hive bundled with Flink 1.12 (see [2]), which
includes an option for a KMS key (hive.s3.kms-key-id).

(also, adding back the user list)

[1]:
https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins

On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:

> Btw, there is also an option to provide a custom credential provider,
> what are your thoughts on this?
>
> presto.s3.credentials-provider
>
>
> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> I've confirmed that for the bundled + shaded aws dependency, the only way
>> to upgrade it is to build a flink-s3-fs-presto jar with the updated
>> dependency. Let me know if this is feasible for you, if the KMS key
>> solution doesn't work.
>>
>> Best,
>> Austin
>>
>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Swagat,
>>>
>>> I don't believe there is an explicit configuration option for the KMS
>>> key – please let me know if you're able to make that work!
>>>
>>> Best,
>>> Austin
>>>
>>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra  wrote:
>>>
 Hi Austin,

 Let me know what you think on my latest email, if the approach might
 work, or if it is already supported and I am not using the configurations
 properly.

 Thanks for your interest and support.

 Regards,
 Swagat

 On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hi Swagat,
>
> It looks like Flink 1.6 bundles the 1.11.165 version of the
> aws-java-sdk-core with the Presto implementation (transitively from Presto
> 0.185[1]).
> The minimum support version for the ServiceAccount authentication
> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
> long
> after Flink 1.6 was released. It looks like even the most recent Presto is
> on a version below that, concretely 1.11.697 in the master branch[4], so I
> don't think even upgrading Flink to 1.6+ will solve this though it looks 
> to
> me like the AWS dependency is managed better in more recent Flink 
> versions.
> I'll have more for you on that front tomorrow, after the Easter break.
>
> I think what you would have to do to make this authentication approach
> work for Flink 1.6 is building a custom version of the flink-s3-fs-presto
> jar, replacing the bundled AWS dependency with the 1.11.704 version, and
> then shading it the same way.
>
> In the meantime, would you mind creating a JIRA ticket with this use
> case? That'll give you the best insight into the status of fixing this :)
>
> Let me know if that makes sense,
> Austin
>
> [1]:
> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
> [2]:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>
> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra 
> wrote:
>
>> Austin -
>>
>> In my case the set up is such that services are deployed on
>> Kubernetes with Docker, running on EKS. There is also an istio service
>> mesh. So all the services communicate and access AWS resources like S3
>> using the service account. Service account is associated with IAM roles. 
>> I
>> have verified that the service account has access to S3, by running a
>> program that connects to S3 to read a file also aws client when
>> packaged into the pod is able to access S3. So that means the roles and
>> policies are good.
>>
>> When I am running flink, I am following the same configuration for
>> job manager and task manager as provided here:
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>
>> The exception we are getting is -
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
>> Unable to load credentials from service end point.
>>
>> This happens in the EC2CredentialFetcher class method
>> fetchCredentials - line number 66, when it tries to read resource,
>> effectively executing
>> CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>>
>> I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>> because its not the right way to do it for us, we are on EKS. Similarly 
>> any
>> of the ~/.aws/credentials file approach will also not work for us.
>>
>>
>> Atm, I 

Re: Flink - Pod Identity

2021-04-05 Thread Swagat Mishra
Hi Austin,

Thanks for your reply.

Atm, I have upgraded to 1.12 version of Flink, but I still see the same
issue. I have taken a look at presto as well. I am looking to
experiment with the settings like S3_KMS_KEY_ID (provided in the link
below). If this doesn't work, I Will look to modify the Presto code to have
a custom version that supports pod identity through a service account.

Yes, I Can create a JIRA ticket for you.

https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3ConfigurationUpdater.java

Regards,
Swagat

On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Swagat,
>
> It looks like Flink 1.6 bundles the 1.11.165 version of the
> aws-java-sdk-core with the Presto implementation (transitively from Presto
> 0.185[1]).
> The minimum support version for the ServiceAccount authentication approach
> is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], long after
> Flink 1.6 was released. It looks like even the most recent Presto is on a
> version below that, concretely 1.11.697 in the master branch[4], so I don't
> think even upgrading Flink to 1.6+ will solve this though it looks to me
> like the AWS dependency is managed better in more recent Flink versions.
> I'll have more for you on that front tomorrow, after the Easter break.
>
> I think what you would have to do to make this authentication approach
> work for Flink 1.6 is building a custom version of the flink-s3-fs-presto
> jar, replacing the bundled AWS dependency with the 1.11.704 version, and
> then shading it the same way.
>
> In the meantime, would you mind creating a JIRA ticket with this use case?
> That'll give you the best insight into the status of fixing this :)
>
> Let me know if that makes sense,
> Austin
>
> [1]:
> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
> [2]:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>
> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra  wrote:
>
>> Austin -
>>
>> In my case the set up is such that services are deployed on Kubernetes
>> with Docker, running on EKS. There is also an istio service mesh. So all
>> the services communicate and access AWS resources like S3 using the service
>> account. Service account is associated with IAM roles. I have verified that
>> the service account has access to S3, by running a program that connects to
>> S3 to read a file also aws client when packaged into the pod is able to
>> access S3. So that means the roles and policies are good.
>>
>> When I am running flink, I am following the same configuration for job
>> manager and task manager as provided here:
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>
>> The exception we are getting is -
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
>> Unable to load credentials from service end point.
>>
>> This happens in the EC2CredentialFetcher class method fetchCredentials -
>> line number 66, when it tries to read resource, effectively executing
>> CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>>
>> I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>> because its not the right way to do it for us, we are on EKS. Similarly any
>> of the ~/.aws/credentials file approach will also not work for us.
>>
>>
>> Atm, I haven't tried the kuberenetes service account property you
>> mentioned above. I will try and let you know how it goes.
>>
>> Question - do i need to provide any parameters while building the docker
>> image or any configuration in the flink config to tell flink that for all
>> purposes it should be using the service account and not try to get into
>> the EC2CredentialFetcher class.
>>
>> One more thing - we were trying this on the 1.6 version of Flink and not
>> the 1.12 version.
>>
>> Regards,
>> Swagat
>>
>> On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:
>>
>>> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
>>> daemonset which runs privileged pods which maps a IP Address of the pods
>>> and its associated service account to make STS calls and return temporary
>>> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
>>> in an ec2 instance.
>>>
>>> I have found that mutating webhooks are easier to deploy (when you have
>>> no control over the Kubernetes environment - say you cannot change iptables
>>> or run privileged pods). These can configure the ~/.aws/credentials file.
>>> The webhook can make the STS call for the service account to role mapping.
>>> A side car container to which the main container has no access can even
>>> renew credentials becoz STS returns temp credentials.
>>>
>>> Sent from 

Re: Flink - Pod Identity

2021-04-05 Thread Austin Cawley-Edwards
Hi Swagat,

It looks like Flink 1.6 bundles the 1.11.165 version of the
aws-java-sdk-core with the Presto implementation (transitively from Presto
0.185[1]).
The minimum support version for the ServiceAccount authentication approach
is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], long after
Flink 1.6 was released. It looks like even the most recent Presto is on a
version below that, concretely 1.11.697 in the master branch[4], so I don't
think even upgrading Flink to 1.6+ will solve this though it looks to me
like the AWS dependency is managed better in more recent Flink versions.
I'll have more for you on that front tomorrow, after the Easter break.

I think what you would have to do to make this authentication approach work
for Flink 1.6 is building a custom version of the flink-s3-fs-presto jar,
replacing the bundled AWS dependency with the 1.11.704 version, and then
shading it the same way.

In the meantime, would you mind creating a JIRA ticket with this use case?
That'll give you the best insight into the status of fixing this :)

Let me know if that makes sense,
Austin

[1]:
https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
[2]:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
[3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
[4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52

On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra  wrote:

> Austin -
>
> In my case the set up is such that services are deployed on Kubernetes
> with Docker, running on EKS. There is also an istio service mesh. So all
> the services communicate and access AWS resources like S3 using the service
> account. Service account is associated with IAM roles. I have verified that
> the service account has access to S3, by running a program that connects to
> S3 to read a file also aws client when packaged into the pod is able to
> access S3. So that means the roles and policies are good.
>
> When I am running flink, I am following the same configuration for job
> manager and task manager as provided here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>
> The exception we are getting is -
> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
> Unable to load credentials from service end point.
>
> This happens in the EC2CredentialFetcher class method fetchCredentials -
> line number 66, when it tries to read resource, effectively executing
> CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>
> I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
> because its not the right way to do it for us, we are on EKS. Similarly any
> of the ~/.aws/credentials file approach will also not work for us.
>
>
> Atm, I haven't tried the kuberenetes service account property you
> mentioned above. I will try and let you know how it goes.
>
> Question - do i need to provide any parameters while building the docker
> image or any configuration in the flink config to tell flink that for all
> purposes it should be using the service account and not try to get into
> the EC2CredentialFetcher class.
>
> One more thing - we were trying this on the 1.6 version of Flink and not
> the 1.12 version.
>
> Regards,
> Swagat
>
> On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:
>
>> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
>> daemonset which runs privileged pods which maps a IP Address of the pods
>> and its associated service account to make STS calls and return temporary
>> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
>> in an ec2 instance.
>>
>> I have found that mutating webhooks are easier to deploy (when you have
>> no control over the Kubernetes environment - say you cannot change iptables
>> or run privileged pods). These can configure the ~/.aws/credentials file.
>> The webhook can make the STS call for the service account to role mapping.
>> A side car container to which the main container has no access can even
>> renew credentials becoz STS returns temp credentials.
>>
>> Sent from my iPhone
>>
>> On Apr 3, 2021, at 10:29 PM, Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>> 
>> If you’re just looking to attach a service account to a pod using the
>> native AWS EKS IAM mapping[1], you should be able to attach the service
>> account to the pod via the `kubernetes.service-account` configuration
>> option[2].
>>
>> Let me know if that works for you!
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#kubernetes-service-account
>>
>> On Sat, Apr 3, 2021 at 10:18 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Can you describe your setup a little bit more? And 

Application cluster - Job execution and cluster creation timeouts

2021-04-05 Thread Tamir Sagi
Hey all,

We deploy application cluster natively on Kubernetes.

are there any timeouts for Job execution and cluster creation?

I went over the configuration page 
here
  but did not find anything relevant.

In order to get an indication about the cluster , we leverage the k8s 
client
 to watch the 
deployment
 in a namespace with specific cluster name and respond accordingly.

we define two timeouts

  1.  Creating the application cluster (i.e. to date if there are errors in 
pods, the k8s deployment is up but the application cluster is not running.)
  2.  Until the application cluster resources get cleaned(upon completion)  - 
which prevent an infinite job execution or k8s glitches

However,  this solution is not ideal because in case this client lib crashes, 
the timeouts are gone.
We don't want to manage these timeouts states ourselves.

Any suggestion or better way?

Thanks,
Tamir.




[https://my-email-signature.link/signature.gif?u=1088647=145346582=3f32b726c93b8d93869d4a1520a346f1c12902a66bd38eb48abc091003335147]

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Flink1.12.0版本 FlinkStreaming如何将偏移量提交到kafka

2021-04-05 Thread guanyq
kafka版本0.11
目前查看消费组的解压情况,报消费组不存在。

Re: Checkpoint timeouts at times of high load

2021-04-05 Thread Geldenhuys, Morgan Karl
Thank you for the information. I have a feeling this is more to do with 
EXACTLY_ONCE kafka producers and transactions not playing nice with checkpoints 
and a timeout happens. The jobs seem to fail and hit this restart and fail 
loop. Looking in the logs, taskmanager logs grow very large with the same 
messages repeating over and over again. Ive attacked a file for this. The two 
lines that give me pause are:


Closing the Kafka producer with timeoutMillis = 0 ms.

Proceeding to force close the producer since pending requests could not be 
completed within timeout 0 ms.


I'm not really sure which timeout this is but it looks like there is a timeout 
loop happening here.


The Kafka producer has been configured as such (the transaction timeout has 
been set on the kafka server to match the producer):


Properties kafkaProducerProps = new Properties();
kafkaProducerProps.setProperty("bootstrap.servers", brokerList);
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
"360");
kafkaProducerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 "5");
kafkaProducerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
UUID.randomUUID().toString());
kafkaProducerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true");
kafkaProducerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500");
kafkaProducerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
kafkaProducerProps.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
kafkaProducerProps.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"3");
kafkaProducerProps.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
"12");

FlinkKafkaProducer myProducer =
new FlinkKafkaProducer<>(
producerTopic,
(KafkaSerializationSchema) (value, aLong) -> {
return new ProducerRecord<>(producerTopic, value.getBytes());
},
kafkaProducerProps,
Semantic.EXACTLY_ONCE,
10);


And checkpoints have been configured as such:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// configuring RocksDB state backend to use HDFS
String backupFolder = props.getProperty("hdfs.backupFolder");
StateBackend backend = new RocksDBStateBackend(backupFolder, true);
env.setStateBackend(backend);
// start a checkpoint based on supplied interval
env.enableCheckpointing(checkpointInterval);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(checkpointInterval);
// checkpoints have to complete within two minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(38);
//env.getCheckpointConfig().setTolerableCheckpointFailureNumber();
// no external services which could take some time to respond, therefore 1
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are deleted after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);


Additionally, each taskmanager has been configured with 4GB of memory, there is 
a sliding window of 10 seconds with a slide of 1 second, and the cluster setup 
is using flink native.


Any hints would be much appreciated!


Regards,

M.



From: Guowei Ma 
Sent: 01 April 2021 14:19
To: Geldenhuys, Morgan Karl
Cc: user
Subject: Re: Checkpoint timeouts at times of high load

Hi,
I think there are many reasons that could lead to the checkpoint timeout.
Would you like to share some detailed information of checkpoint? For example, 
the detailed checkpoint information from the web.[1]  And which Flink version 
do you use?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/checkpoint_monitoring.html

Best,
Guowei


On Thu, Apr 1, 2021 at 4:33 PM Geldenhuys, Morgan Karl 
mailto:morgan.geldenh...@tu-berlin.de>> wrote:

Hi Community,


I have a number of flink jobs running inside my session cluster with varying 
checkpoint intervals plus a large amount of operator state and in times of high 
load, the jobs fail due to checkpoint timeouts (set to 6 minutes). I can only 
assume this is because the latencies for saving checkpoints at these times of 
high load increase. I have a 30 node HDFS cluster for checkpoints... however I 
see that only 4 of these nodes are being used for storage. Is there a way of 
ensuring the load is evenly spread? Could there be another reason for these 
checkpoint timeouts? Events are being consumed from kafka, to kafka with 
EXACTLY ONCE guarantees enabled.


Thank you very much!


M.
2021-04-05 07:38:16,437 INFO  org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer clientId=producer-Sink: