Re: pyflink资源优化问题,请教

2021-04-05 文章 Dian Fu
处理逻辑看起来应该是没有问题的。

1)可以详细说一下,你说的数据延迟问题吗?现在的qps可以达到多少,预期是多少?
2)你现在用的哪种部署模式?
3)并发度的设置可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/table_environment.html#configuration
 

4)内存相关的配置的配置项可以看一下:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration
 
,设置方法,可以参考3)
5)至于并发度/内存设置成多少,这个完全取决于你的业务逻辑以及需要达到的qps,具体值需要测一下才知道设置成多少合适。


> 2021年4月6日 上午11:43,郭华威  写道:
> 
> hidden email
> 
> 在2021年04月06日 11:36,苗红宾 写道:
> 你好:
> 业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。
> 
> 
> 现在的使用方式:
> 1、slide_window = 
> Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
> 2、使用sql语句注册kafka connector,
> 3、result table使用普通的print:
> CREATE TABLE sink (
> city_id STRING ,
> 
> start_time TIMESTAMP ,
> 
> end_time TIMESTAMP ,
> 
> flag   STRING
> 
> ) with (
>   'connector' = 'print'
> )
> 4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
> 5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
>  
> city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()
> 
> 
> 
> 
> 这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。
> 
> 
> 所以想请教一下:
> 1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
> 2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等
> 
> 
> 多谢



(无主题)

2021-04-05 文章 郭华威
退订

回复:pyflink资源优化问题,请教

2021-04-05 文章 郭华威
hidden email

在2021年04月06日 11:36,苗红宾 写道:
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = 
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
   'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
 
city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢

pyflink资源优化问题,请教

2021-04-05 文章 苗红宾
你好:
业务场景是:数据源是kafka,10分钟总数据量在10G左右,里面包括200个城市的数据,期望使用滚动窗口,按城市分组,每2分钟触发一次将所有城市的过去十分钟数据放到各自的list里,然后转换成pandas,针对该城市做一次整体计算,每次每个城市的计算耗时平均在60s左右。


现在的使用方式:
1、slide_window = 
Slide.over(f"10.minutes").every(f"2.minutes").on('ts').alias("w")
2、使用sql语句注册kafka connector,
3、result table使用普通的print:
CREATE TABLE sink (
city_id STRING ,

start_time TIMESTAMP ,

end_time TIMESTAMP ,

flag   STRING

) with (
'connector' = 'print'
)
4、通过udaf函数,把source的数据写入csv文件,source.select("write_csv()"),然后调用计算函数,读取csv文件内容
5、触发计算逻辑通过select里调用自定义函数strategy_flow_entry的方式:source.window(slide_window).group_by("w,
 
city_id").select("strategy_flow_entry(concat_result)").execute_insert("sink").wait()




这种方式在运行过程中,总是出各种各样问题,比如数据延迟等。


所以想请教一下:
1、针对这个场景,推荐的使用方式是什么?目前的使用方式是不是不太对
2、推荐的任务提交参数要怎么设置?cpu core、内存、并发数、slot等


多谢

??????????????????????????????????????????????????????

2021-04-05 文章 83315898
??
1. ??KAFKA??hivehive??,format ?? 
Parquet,??
   
2.  Pipeline ?? Hive Table 
??java.lang.ArrayIndexOutOfBoundsException: -1
  ??flink sql client  
  1??select 
      ??  select *
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */
 ;

2) 
??1)java.lang.ArrayIndexOutOfBoundsException:
 -1
  ??  select count(xubtappid)
 from ubtCatalog.ubtHive.event_all_dwd  
 /*+ OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition.include'='all', 
'streaming-source.monitor-interval'='5s', 
'streaming-source.consume-order'='partition-time','streaming-source.consume-start-offset'='2021-01-01')
 */
 ;

??
2021-04-02 10:06:26
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:469)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: HiveSource-ubtHive.event_all_dwd' (operator 
bc764cd8ddf7a0cff126f51c16239658).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:466)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:240)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:247)
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:44)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.connectors.hive.FlinkHiveException: Failed to 
enumerate files
    at 
org.apache.flink.connectors.hive.ContinuousHiveSplitEnumerator.handleNewSplits(ContinuousHiveSplitEnumerator.java:148)
    at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$4(ExecutorNotifier.java:135)
    at 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
    ... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
    at 
org.apache.flink.connectors.hive.util.

Re: Flink启动后某个TM的某个slot不工作,看起来像是直接没任何通信。

2021-04-05 文章 yidan zhao
这个问题出现很多次了。目前还有一种case,不是启动的时候。
如果是启动的时候,则表现为watermark显示为没有,即无任何watermark。
另一种case是启动后正常运行,若干时间(可能几小时,也可能很多天)后,突然开始watermark无限停滞。导致无限反压。

yidan zhao  于2021年3月3日周三 下午2:47写道:

> 如题,日志:
> 2021-03-03 11:03:17,151 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:17,344 WARN org.apache.hadoop.util.NativeCodeLoader [] -
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable
>
> 2021-03-03 11:03:17,441 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:18,226 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> SASL configuration failed: javax.security.auth.login.LoginException: No
> JAAS configuration section named 'Client' was found in specified JAAS
> configuration file:
> '/home/work/antibotFlink/flink-1.12.0/tmp/jaas-1092430908919603833.conf'.
> Will continue connection to Zookeeper server without SASL authentication,
> if Zookeeper server allows it.
>
> 2021-03-03 11:03:18,227 ERROR
> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
> Authentication failed
>
> 2021-03-03 11:03:18,957 ERROR akka.remote.transport.netty.NettyTransport
> [] - failed to bind to /0.0.0.0:2027, shutting down Netty transport
> 2021-03-03 11:03:18,973 ERROR akka.remote.Remoting [] - Remoting system
> has been terminated abrubtly. Attempting to shut down transports
>
> 如上,有个关于端口的报错,有人知道原因吗?
> 问题直接表现和影响是,我某个source的task无任何输出(此处的无输出包括任何数据,bytes
> sent为0)。导致后续结点无watermark。进而反压永久=1(进而出现了一种之前就觉得很奇怪的场景:即反压到不工作,CPU都不再利用了。。。)。
>


Re: flink kubernetes application频繁重启TaskManager问题

2021-04-05 文章 Yang Wang
你的cpu设置这么小,K8s是严格限制的

我怀疑TM启动很慢,一直注册不上来超时导致失败了,你可以看看TM log确认一下

另外,从你发的这个log看,rest endpoint应该已经成功启动了,可以通过来进行访问

Best,
Yang

casel.chen  于2021年4月5日周一 上午10:05写道:

> 最近试用flink kubernetes
> application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web
> ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么?
>
>
> = 启动参数
> "kubernetes.jobmanager.cpu": "0.1",
> "kubernetes.taskmanager.cpu": "0.1",
> "taskmanager.numberOfTaskSlots": "1",
> "jobmanager.memory.process.size": "1024m",
> "taskmanager.memory.process.size": "1024m",
>
>
> = k8s日志
>
>
>
> 2021-04-05 09:55:14,777 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] -
> JobManager successfully registered at ResourceManager, leader id:
> 9903e058fb5ca6f418c78dafcad048f1.
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,869 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,870 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] -
> Requesting new slot [SlotRequestId{3bcf44c03f742d211b5abcc9d0d35068}] and
> profile ResourceProfile{UNKNOWN} with allocation id
> 17bcd11a1d493155e3ed45cfd200be79 from resource manager.
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Registered job manager 84684c706147dc0958ff0929c8024...@akka.tcp://
> flink@172.17.0.5:6123/user/rpc/jobmanager_2 for job
> .
> 2021-04-05 09:55:14,871 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Request slot with profile ResourceProfile{UNKNOWN} for job
>  with allocation id
> 17bcd11a1d493155e3ed45cfd200be79.
> 2021-04-05 09:55:14,974 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:55:15,272 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:55:18,570 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> new TaskManager pod with name
> flink-k8s-native-application-cluster-taskmanager-1-1 and resource
> <1024,0.1>.
> 2021-04-05 09:55:22,669 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod
> flink-k8s-native-application-cluster-taskmanager-1-1 is created.
> 2021-04-05 09:55:22,670 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received
> new TaskManager pod: flink-k8s-native-application-cluster-taskmanager-1-1
> 2021-04-05 09:55:22,770 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requested worker flink-k8s-native-application-cluster-taskmanager-1-1 with
> resource spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb
> (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb
> (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 with resource
> spec WorkerResourceSpec {cpuCores=0.1, taskHeapSize=25.600mb (26843542
> bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes),
> managedMemSize=230.400mb (241591914 bytes)} was requested in current
> attempt and has not registered. Current pending count after removing: 0.
> 2021-04-05 09:56:35,494 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Worker flink-k8s-native-application-cluster-taskmanager-1-1 is terminated.
> Diagnostics: Pod terminated, container termination statuses:
> [flink-task-manager(exitCode=1, reason=Error, message=null)]
> 2021-04-05 09:56:35,495 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Requesting new worker with resource spec WorkerResourceSpec {cpuCores=0.1,
> taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes,
> networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb
> (241591914 bytes)}, current pending count: 1.
> 2021-04-05 09:56:35,496 INFO
> org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
> Enabled external resources: []
> 2021-04-05 09:56:35,498 INFO
> org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Creating
> new

Flink1.2对 key 进行分区,和 hash 分区有什么区别?

2021-04-05 文章 刘文




Flink1.2对 key 进行分区,和 hash 分区有什么区别?
如: 分区数值 = key 的 hash值 % 并行度?


为什么不直接使用 hash 进行分区?


KeyGroupStreamPartitioner.java
|
|
@Override
public int selectChannel(SerializationDelegate> record) {
K key;
   try {
  key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + 
record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfChannels);
}

Flink1.12.0版本 FlinkStreaming如何将偏移量提交到kafka

2021-04-05 文章 guanyq
kafka版本0.11
目前查看消费组的解压情况,报消费组不存在。