Re: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-25 Thread Jim Chen
  我们的维表,大概10个亿左右。每秒大概有4万的请求,要去查询,所以mysql扛不住。 还有就是维表数据变化后,需要秒级进行更新和关联的

china_tao  于2020年8月18日周二 下午11:13写道:

> 个人觉得还是取舍的问题,我们现在用flink sql 做实时数仓,维度表暂时用mysql,与业务商定好更新事件后,配置flink sql
> jdbc的lookup.cache.ttl参数来设置刷新时间,不知道你项目中,是维表数据变更后,需要秒级关联到消息中?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink-1.11.1 Application-Mode提交测试

2020-08-25 Thread amen...@163.com
hi, everyone

当我把jar包都上传至hdfs时,使用如下命令进行application mode提交,

./bin/flink run-application -t yarn-application 
-Dyarn.provided.lib.dirs="hdfs:///user/flink/lib" -c 
com.yui.flink.demo.Kafka2Mysql hdfs:///user/flink/app_jars/kafka2mysql.jar

报异常如下:

 The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn Application Cluster
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
at 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
at 
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1598223665550_0009 failed 1 
times (global limit =2; local limit is =1) due to AM Container for 
appattempt_1598223665550_0009_01 exited with  exitCode: -1
Failing this attempt.Diagnostics: [2020-08-25 15:12:48.975]Destination must be 
relative
For more detailed output, check the application tracking page: 
http://ck233:8088/cluster/app/application_1598223665550_0009 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1598223665550_0009
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:407)
... 9 more

其他没有任何的错误了,使用run -m yarn-cluster是可以正常提交的

best,
amenhub


Re: 回复:flink1.11 sql问题

2020-08-25 Thread taochanglian

flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。

在 2020/8/25 14:59, 酷酷的浑蛋 写道:

还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?




在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了




在2020年08月25日 13:58,酷酷的浑蛋 写道:




flink1.11 
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?




flink webui前端

2020-08-25 Thread 罗显宴
大家好,请问flink的webui前端实现的源码在哪呀


| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制

回复: flink1.11 sql问题

2020-08-25 Thread 酷酷的浑蛋
关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗




在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。

在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?




在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了




在2020年08月25日 13:58,酷酷的浑蛋 写道:




flink1.11 
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?


请教一下flink链接hive的权限控制

2020-08-25 Thread faaron zheng
Hi all, 我在使用flink sql-client链接hive 
metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么?

从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

2020-08-25 Thread yang zhang
请教下开发者:
从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

是不支持pojo格式流注册成表吗?只能是Row格式吗?

下面是代码

//1.创建执行环境
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

//全局参数设置
streamEnv.getConfig().setGlobalJobParameters(parameters2);

//table env
StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);


//2.读取kafka | other source

// DataStream dataStream = null;

// if ("kafka".equalsIgnoreCase(sourceType)) {

//用jsonString反序列化

// dataStream = FlinkUtils.createKafkaStream(parameters2, 
SimpleStringSchema.class);

// }

//###定义消费kafka source##
Properties props = new Properties();
//指定Ka fka的Broker地址
props.setProperty("bootstrap.servers", 
parameters2.getRequired("bootstrap.servers"));
//指定组ID
props.setProperty("group.id", parameters2.get("group.id"));
//如果没有记录偏移量,第一次从最开始消费
// props.setProperty("auto.offset.reset", 
parameters.get("auto.offset.reset","earliest"));
//kafka的消费者不自动提交偏移量
props.setProperty("enable.auto.commit", 
parameters2.get("enable.auto.commit","false"));

List topics = 
Arrays.asList(parameters2.get("topics").split(","));



//new KafkaSource instance
FlinkKafkaConsumer kafkaConsumer = new 
FlinkKafkaConsumer(
topics,
SimpleStringSchema.class.newInstance(),
props);

//得到kafka流
DataStreamSource dataStream = 
streamEnv.addSource(kafkaConsumer);

//3.映射为实体
SingleOutputStreamOperator map = dataStream.map(new 
Map2EntityFunction()).returns(Class.forName(sourceClass));

//4.注册一个实例获取column names
Class clz = Class.forName(sourceClass);
Object vo = clz.newInstance();
StringBuilder columnBuilder = new StringBuilder();
Field[] declaredFields = vo.getClass().getDeclaredFields();
for (int i = 0; i < declaredFields.length; i++) {
String fieldName = declaredFields[i].getName();
columnBuilder.append(fieldName);
if (i < declaredFields.length - 1) {
columnBuilder.append(",");
}
}

String fieldsDeclare = columnBuilder.toString();

System.err.println(fieldsDeclare);

//5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
tEnv.registerDataStream(sourceName, map,fieldsDeclare);

//6.执行语句
Table table = tEnv.sqlQuery(executiveSql);

//7.print
tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果

   
//8.execute 
streamEnv.execute(jobName);
  
  
  
  
---
  
/**
 * 
 * 根据传入的映射类返回一个通用的POJO流
 */
public class Map2EntityFunction extends RichMapFunction {


@Override
public T map(String s) throws Exception {
System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
ParameterTool params = (ParameterTool) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String sourceClass = params.getRequired("sourceClass");
Preconditions.checkNotNull(sourceClass);
Class clz = (Class) Class.forName(sourceClass);
return JsonUtil.json2object(s, clz);
}


}
  
---
  




从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

2020-08-25 Thread yang zhang
从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

请问是不支持pojo流注册表吗?只能是Row类型吗?

下面是相关代码



//1.创建执行环境
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

//全局参数设置
streamEnv.getConfig().setGlobalJobParameters(parameters2);

//table env
StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);


//2.读取kafka | other source

//DataStream dataStream = null;

//if ("kafka".equalsIgnoreCase(sourceType)) {

//用jsonString反序列化

//dataStream = FlinkUtils.createKafkaStream(parameters2, 
SimpleStringSchema.class);

//}

//###定义消费kafka source##
Properties props = new Properties();
//指定Ka fka的Broker地址
props.setProperty("bootstrap.servers", 
parameters2.getRequired("bootstrap.servers"));
//指定组ID
props.setProperty("group.id", parameters2.get("group.id"));
//如果没有记录偏移量,第一次从最开始消费
//props.setProperty("auto.offset.reset", 
parameters.get("auto.offset.reset","earliest"));
//kafka的消费者不自动提交偏移量
props.setProperty("enable.auto.commit", 
parameters2.get("enable.auto.commit","false"));

List topics = 
Arrays.asList(parameters2.get("topics").split(","));



//new KafkaSource instance
FlinkKafkaConsumer kafkaConsumer = new 
FlinkKafkaConsumer(
topics,
SimpleStringSchema.class.newInstance(),
props);

//得到kafka流
DataStreamSource dataStream = 
streamEnv.addSource(kafkaConsumer);

//3.映射为实体
SingleOutputStreamOperator map = dataStream.map(new 
Map2EntityFunction()).returns(Class.forName(sourceClass));

//4.注册一个实例获取column names
Class clz = Class.forName(sourceClass);
Object vo = clz.newInstance();
StringBuilder columnBuilder = new StringBuilder();
Field[] declaredFields = vo.getClass().getDeclaredFields();
for (int i = 0; i < declaredFields.length; i++) {
String fieldName = declaredFields[i].getName();
columnBuilder.append(fieldName);
if (i < declaredFields.length - 1) {
columnBuilder.append(",");
}
}

String fieldsDeclare = columnBuilder.toString();

System.err.println(fieldsDeclare);

//5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
tEnv.registerDataStream(sourceName, map,fieldsDeclare);

//6.执行语句
Table table = tEnv.sqlQuery(executiveSql);

//7.print
tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果

   
//8.execute 
streamEnv.execute(jobName);




---

/**
 * 
 * 根据传入的映射类返回一个通用的POJO流
 */
public class Map2EntityFunction extends RichMapFunction {


@Override
public T map(String s) throws Exception {
System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
ParameterTool params = (ParameterTool) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String sourceClass = params.getRequired("sourceClass");
Preconditions.checkNotNull(sourceClass);
Class clz = (Class) Class.forName(sourceClass);
return JsonUtil.json2object(s, clz);
}


}

---
 

Flink??????????????????????

2020-08-25 Thread Sun_yijia
??A??B??AB??
??B??ABA


??Flink??AB

Re: flink1.11 sql问题

2020-08-25 Thread Jim Chen
这个需要你自定义UDF

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>


Re: flink webui前端

2020-08-25 Thread Jim Chen
flinlk-runtime-web,这个module下

罗显宴 <15927482...@163.com> 于2020年8月25日周二 下午3:43写道:

> 大家好,请问flink的webui前端实现的源码在哪呀
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制


Re: flink1.11 sql问题

2020-08-25 Thread zilong xiao
直接CAST不可以吗?

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>


回复:请教一下flink链接hive的权限控制

2020-08-25 Thread xiaoyan hua
我们当前用的是kerberos认证,需要额外配置什么么? xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由 网易邮箱大师 
定制 在2020年08月25日 15:54,faaron zheng 写道: Hi all, 我在使用flink sql-client链接hive 
metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么?

关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-25 Thread Jim Chen
大家好:
我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
%msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread song wang
hi, Xintong:

我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:

2020-08-22 05:39:24,473 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - The heartbeat of ResourceManager with id
6724e1ef8ee1c5fe5212eec6182319b6 timed out.
2020-08-22 05:39:24,473 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Close ResourceManager connection
6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager with id
6724e1ef8ee1c5fe5212eec6182319b6 timed out..

之后就一直报错 "Could not resolve ResourceManager address" 了,
看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
可是resourcemanager地址是没有问题的。

请问有没有办法可以查看resourcemanager的健康状况呢?


// 代码:
package org.apache.flink.runtime.registration;
public abstract class RetryingRegistration {
   public void startRegistration() {
 ...
 if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture) rpcService.connect(
   targetAddress,
   fencingToken,
   targetType.asSubclass(FencedRpcGateway.class));
 } else {
// 连接resourcemanager
rpcGatewayFuture = rpcService.connect(targetAddress,
targetType);
 }
 ...
 rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
   if (failure != null && !canceled) {
  final Throwable strippedFailure =
ExceptionUtils.stripCompletionException(failure);
  if (log.isDebugEnabled()) {
 ...
  } else {
 // 报错
 log.info(
"Could not resolve {} address {}, retrying in {}
ms:
{}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
  }
  // 重连

startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
   }
},
rpcService.getExecutor());
   }
}


==
以下是一次提交job的完成报错日志
==
2020-08-25 16:02:55,737 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
JobGraph submission 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
2020-08-25 16:02:55,738 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
job 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
2020-08-25 16:02:56,552 INFO
 org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Added SubmittedJobGraph(4a324bc1e1eeb964116686e568cea8ad) to ZooKeeper.
2020-08-25 16:02:56,554 INFO
 org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/jobmanager_42 .
2020-08-25 16:02:56,554 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Initializing job Streaming WordCount
(4a324bc1e1eeb964116686e568cea8ad).
2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Using restart strategy
FailureRateRestartStrategy(failuresInterval=30 msdelayInterval=1
msmaxFailuresPerInterval=10) for Streaming WordCount
(4a324bc1e1eeb964116686e568cea8ad).
2020-08-25 16:02:56,555 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
recovers via failover strategy: New Pipelined Region Failover
2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Running initialization on master for job Streaming
WordCount (4a324bc1e1eeb964116686e568cea8ad).
2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Successfully ran initialization on master in 0 ms.
2020-08-25 16:02:56,556 INFO
 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
 - Start building failover regions.
2020-08-25 16:02:56,556 INFO
 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
 - Created 1 failover regions.
2020-08-25 16:02:56,559 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
 - Initialized ZooKeeperCompletedCheckpointStore in
'/checkpoints/4a324bc1e1eeb964116686e568cea8ad'.
2020-08-25 16:02:56,560 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Loading state backend via factory
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2020-08-25 16:02:56,560 INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using
predefined options: DEFAULT.
2020-08-25 16:02:56,560 INFO
 org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using
default options factory:
DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-08-25 1

Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-25 Thread wxpcc
感谢解答

就像你说的,的确可以 用lookup方式实现一部分公用kv的共享

我的理解现有的 rocksdb backend 为:rocksdb+hdfs , 如果是变成:rocksdb+tikv ,这样在一些应用过程中产生的
kv指标数据最终会存储到 tikv之中,外部也有可能访问到,通过 lookup的方式,不知道这样是否可行





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread Xintong Song
>
> 出现这个报错后就提交不了任务了
>
我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?

从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。

我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader 产生。所以对于
JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server 的表现则是找不到新的
leader 无法提交新的作业。

Thank you~

Xintong Song



On Tue, Aug 25, 2020 at 4:50 PM song wang  wrote:

> hi, Xintong:
>
> 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
>
> 2020-08-22 05:39:24,473 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - The heartbeat of ResourceManager with id
> 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
> 2020-08-22 05:39:24,473 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Close ResourceManager connection
> 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager with id
> 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
>
> 之后就一直报错 "Could not resolve ResourceManager address" 了,
> 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
> 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
> 可是resourcemanager地址是没有问题的。
>
> 请问有没有办法可以查看resourcemanager的健康状况呢?
>
>
> // 代码:
> package org.apache.flink.runtime.registration;
> public abstract class RetryingRegistration extends RpcGateway, S extends RegistrationResponse.Success> {
>public void startRegistration() {
>  ...
>  if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
> rpcGatewayFuture = (CompletableFuture) rpcService.connect(
>targetAddress,
>fencingToken,
>targetType.asSubclass(FencedRpcGateway.class));
>  } else {
> // 连接resourcemanager
> rpcGatewayFuture = rpcService.connect(targetAddress,
> targetType);
>  }
>  ...
>  rpcGatewayAcceptFuture.whenCompleteAsync(
> (Void v, Throwable failure) -> {
>if (failure != null && !canceled) {
>   final Throwable strippedFailure =
> ExceptionUtils.stripCompletionException(failure);
>   if (log.isDebugEnabled()) {
>  ...
>   } else {
>  // 报错
>  log.info(
> "Could not resolve {} address {}, retrying in {}
> ms:
>
> {}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
>   }
>   // 重连
>
>
> startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
>}
> },
> rpcService.getExecutor());
>}
> }
>
>
>
> ==
> 以下是一次提交job的完成报错日志
>
> ==
> 2020-08-25 16:02:55,737 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> JobGraph submission 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
> 2020-08-25 16:02:55,738 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
> job 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
> 2020-08-25 16:02:56,552 INFO
>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Added SubmittedJobGraph(4a324bc1e1eeb964116686e568cea8ad) to ZooKeeper.
> 2020-08-25 16:02:56,554 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> akka://flink/user/jobmanager_42 .
> 2020-08-25 16:02:56,554 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Initializing job Streaming WordCount
> (4a324bc1e1eeb964116686e568cea8ad).
> 2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Using restart strategy
> FailureRateRestartStrategy(failuresInterval=30 msdelayInterval=1
> msmaxFailuresPerInterval=10) for Streaming WordCount
> (4a324bc1e1eeb964116686e568cea8ad).
> 2020-08-25 16:02:56,555 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> recovers via failover strategy: New Pipelined Region Failover
> 2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Running initialization on master for job Streaming
> WordCount (4a324bc1e1eeb964116686e568cea8ad).
> 2020-08-25 16:02:56,555 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  - Successfully ran initialization on master in 0 ms.
> 2020-08-25 16:02:56,556 INFO
>
>  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
>  - Start building failover regions.
> 2020-08-25 16:02:56,556 INFO
>
>  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
>  - Created 1 failover regio

Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-25 Thread zilong xiao
1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
2:这些属性有办法可以从环境变量中获取

Jim Chen  于2020年8月25日周二 下午4:49写道:

> 大家好:
> 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
>


Re: 流处理任务中checkpoint失败

2020-08-25 Thread Congxian Qiu
Hi
   对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月25日周二 上午12:58写道:

> 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> 官方文档对于在iterative
> stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
>
> ---原始邮件---
> 发件人: "Congxian Qiu" 发送时间: 2020年8月24日(周一) 晚上8:21
> 收件人: "user-zh" 主题: Re: 流处理任务中checkpoint失败
>
>
> Hi
>    从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> checkpoint
> 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
>    另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
>
> [1] https://zhuanlan.zhihu.com/p/87131964
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com> 于2020年8月21日周五 下午6:31写道:
>
> > Hello all,
> > 目前遇到一个问题,在iterative stream job
> > 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> > 测试state 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> > Exceeded checkpoint tolerable failure threshold.的报错
> >
> >
> > 配置如下:
> > env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
> > CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > checkpointConfig.setCheckpointTimeout(60);
> > checkpointConfig.setMinPauseBetweenCheckpoints(6);
> > checkpointConfig.setMaxConcurrentCheckpoints(4);
> >
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > checkpointConfig.setPreferCheckpointForRecovery(true);
> > checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > checkpointConfig.enableUnalignedCheckpoints();
> >
> >
> > 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-25 Thread Yun Tang
Hi

这种思路我觉得是可以尝试的,不过目前看需要改动的地方很多:

  1.  需要更改RocksDB 创建checkpoint 到TiKV的代码逻辑
  2.  需要改动RocksDB 从checkpoint resume的代码逻辑
  3.  
如果想要数据可以TiKV可以读取,那么TiKV中存储的格式要么与RocksDB内存储的一样,那这样子的话,lookup时候,需要能够反序列化Flink在RocksDB中的存储格式;要么是重新的格式,但这样子会导致RocksDB的checkpoint流程和时间都会增长。
  4.  TiKV中的数据的更新依赖于checkpoint interval,不能做到实时更新

其实queryable state 也是一个可以实现你们类似目的的方式,不确定你们是否可以尝试。

祝好
唐云

From: wxpcc 
Sent: Tuesday, August 25, 2020 17:05
To: user-zh@flink.apache.org 
Subject: Re: 有没有可能使用tikv作为flink 分布式的backend

感谢解答

就像你说的,的确可以 用lookup方式实现一部分公用kv的共享

我的理解现有的 rocksdb backend 为:rocksdb+hdfs , 如果是变成:rocksdb+tikv ,这样在一些应用过程中产生的
kv指标数据最终会存储到 tikv之中,外部也有可能访问到,通过 lookup的方式,不知道这样是否可行





--
Sent from: http://apache-flink.147419.n8.nabble.com/


ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-25 Thread x
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)??
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
private var state: MapState[String,Boolean] = _
override def open
override def process
override def clear(ctx: Context): Unit = {
state.clear()
}
}

回复: flink1.11 sql问题

2020-08-25 Thread 酷酷的浑蛋
CREATE TABLE test (
a VARCHAR, 
b INT
 ) WITH (
'connector'='kafka', 
'topic'='test', 
'properties.bootstrap.servers'='xxx',
'properties.group.id'='groupid',
'scan.startup.mode'='group-offsets', 
'format'='json'
);
我说的是在source阶段,读过来的数据,如果某个key的值是嵌套json或者json数组,那么,format=json是获取不到这个字段值的,直接显示为空


在2020年08月25日 16:23,zilong xiao 写道:
直接CAST不可以吗?

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗




在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。

在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?




在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了




在2020年08月25日 13:58,酷酷的浑蛋 写道:




flink1.11
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?



回复: flink1.11 sql问题

2020-08-25 Thread 酷酷的浑蛋


还没到udf那一步,直接用create table的方式,过来的数据就是获取不到值的,
CREATE TABLE test (
a VARCHAR, 
b INT
 ) WITH (
'connector' = 'kafka', 
'topic' = 'test', 
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'groupid',
'scan.startup.mode' = 'group-offsets', 
'format'='json'
);




在2020年08月25日 16:14,Jim Chen 写道:
这个需要你自定义UDF

酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:

关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗




在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。

在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?




在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了




在2020年08月25日 13:58,酷酷的浑蛋 写道:




flink1.11
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?



Re: flink1.11 sql问题

2020-08-25 Thread Benchao Li
Hi,

这个功能已经在1.12支持了[1],如果着急使用,可以cherry-pick回去试试看。
用法就是直接把这个字段声明为varchar,json format会帮你自动处理

[1] https://issues.apache.org/jira/browse/FLINK-18002

酷酷的浑蛋  于2020年8月25日周二 下午6:32写道:

>
>
> 还没到udf那一步,直接用create table的方式,过来的数据就是获取不到值的,
> CREATE TABLE test (
> a VARCHAR,
> b INT
>  ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test',
> 'properties.bootstrap.servers' = 'xxx',
> 'properties.group.id' = 'groupid',
> 'scan.startup.mode' = 'group-offsets',
> 'format'='json'
> );
>
>
>
>
> 在2020年08月25日 16:14,Jim Chen 写道:
> 这个需要你自定义UDF
>
> 酷酷的浑蛋  于2020年8月25日周二 下午3:46写道:
>
> 关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
>
>
>
>
> 在2020年08月25日 15:34,taochanglian 写道:
> flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
>
> 在 2020/8/25 14:59, 酷酷的浑蛋 写道:
> 还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
>
>
>
>
> 在2020年08月25日 14:05,酷酷的浑蛋 写道:
> 我知道了
>
>
>
>
> 在2020年08月25日 13:58,酷酷的浑蛋 写道:
>
>
>
>
> flink1.11
>
> 读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
>
>

-- 

Best,
Benchao Li


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread song wang
你好,
现在yarn-session上还是可以提交新作业的。只是运行时无法分配slot,报错无法解析 resourcemanager 地址。

如果是RM leadership丢失的话,
1. 怎么可以确认是leader丢失呢?
2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
3. 有什么办法可以恢复吗?

Xintong Song  于2020年8月25日周二 下午5:26写道:

> >
> > 出现这个报错后就提交不了任务了
> >
> 我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?
>
> 从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
> 如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。
>
> 我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader 产生。所以对于
> JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server 的表现则是找不到新的
> leader 无法提交新的作业。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Aug 25, 2020 at 4:50 PM song wang 
> wrote:
>
> > hi, Xintong:
> >
> > 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
> >
> > 2020-08-22 05:39:24,473 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - The heartbeat of ResourceManager with id
> > 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
> > 2020-08-22 05:39:24,473 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Close ResourceManager connection
> > 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager with
> id
> > 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
> >
> > 之后就一直报错 "Could not resolve ResourceManager address" 了,
> > 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
> > 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
> > 可是resourcemanager地址是没有问题的。
> >
> > 请问有没有办法可以查看resourcemanager的健康状况呢?
> >
> >
> > // 代码:
> > package org.apache.flink.runtime.registration;
> > public abstract class RetryingRegistration > extends RpcGateway, S extends RegistrationResponse.Success> {
> >public void startRegistration() {
> >  ...
> >  if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
> > rpcGatewayFuture = (CompletableFuture) rpcService.connect(
> >targetAddress,
> >fencingToken,
> >targetType.asSubclass(FencedRpcGateway.class));
> >  } else {
> > // 连接resourcemanager
> > rpcGatewayFuture = rpcService.connect(targetAddress,
> > targetType);
> >  }
> >  ...
> >  rpcGatewayAcceptFuture.whenCompleteAsync(
> > (Void v, Throwable failure) -> {
> >if (failure != null && !canceled) {
> >   final Throwable strippedFailure =
> > ExceptionUtils.stripCompletionException(failure);
> >   if (log.isDebugEnabled()) {
> >  ...
> >   } else {
> >  // 报错
> >  log.info(
> > "Could not resolve {} address {}, retrying in {}
> > ms:
> >
> >
> {}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
> >   }
> >   // 重连
> >
> >
> >
> startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
> >}
> > },
> > rpcService.getExecutor());
> >}
> > }
> >
> >
> >
> >
> ==
> > 以下是一次提交job的完成报错日志
> >
> >
> ==
> > 2020-08-25 16:02:55,737 INFO
> >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> > JobGraph submission 4a324bc1e1eeb964116686e568cea8ad (Streaming
> WordCount).
> > 2020-08-25 16:02:55,738 INFO
> >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> Submitting
> > job 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
> > 2020-08-25 16:02:56,552 INFO
> >  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> > Added SubmittedJobGraph(4a324bc1e1eeb964116686e568cea8ad) to ZooKeeper.
> > 2020-08-25 16:02:56,554 INFO
> >  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> > RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> > akka://flink/user/jobmanager_42 .
> > 2020-08-25 16:02:56,554 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Initializing job Streaming WordCount
> > (4a324bc1e1eeb964116686e568cea8ad).
> > 2020-08-25 16:02:56,555 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Using restart strategy
> > FailureRateRestartStrategy(failuresInterval=30 msdelayInterval=1
> > msmaxFailuresPerInterval=10) for Streaming WordCount
> > (4a324bc1e1eeb964116686e568cea8ad).
> > 2020-08-25 16:02:56,555 INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> > recovers via failover strategy: New Pipelined Region Failover
> > 2020-08-25 16:02:56,555 INFO
> org.apache.flink.runtime.jobmaster.JobMaster
> >  - Running initialization on master for job Streaming
> > WordCount (4a324bc1e1eeb964116686e568cea8ad).
> > 2020-08-25 16:02:

?????? flink 1.10.1 ???????? OutOfMemoryError: Metaspace

2020-08-25 Thread kcz
??ES5??pretty
 good??




--  --
??: 
   "user-zh"

https://www.yuque.com/codeleven/flink/dgygq2> ; 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 请教一下flink链接hive的权限控制

2020-08-25 Thread Rui Li
Hi,

Authentication的话支持kerberos,应该正常做kinit就可以了。或者可以设置flink
security相关的参数,如security.kerberos.login.keytab和security.kerberos.login.principal。具体可以参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems

Authorization目前HiveCatalog这边没有做。如果你的HMS启用了authorization(比如hive自身的SQL
standard authorization),那验证的动作应该发生在HMS端,对HiveCatalog也是生效的。

On Tue, Aug 25, 2020 at 4:48 PM xiaoyan hua  wrote:

> 我们当前用的是kerberos认证,需要额外配置什么么? xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由
> 网易邮箱大师 定制 在2020年08月25日 15:54,faaron zheng 写道: Hi all, 我在使用flink
> sql-client链接hive metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么?



-- 
Best regards!
Rui Li


回复:flink1.11 sql问题

2020-08-25 Thread kcz
这个功能非常好的,因为第三方数据总是搞事情,动不动就加字段,改名字的。





-- 原始邮件 --
发件人: Benchao Li https://issues.apache.org/jira/browse/FLINK-18002

酷酷的浑蛋 

回复:流处理任务中checkpoint失败

2020-08-25 Thread Robert.Zhang
Hi Congxian,
测试的时候数据量是很小的,cpu使用比较低的,让我比较奇怪的一点是我杀掉任务重启的话,有时候能成功进行checkpoint,看了下日志,就是这个iteration
 
source成功执行了snapshot,发起了barrier,进而影响到后续operator的checkpoint。失败的时候是该source无法snapshot,直至超时。
因为flink这一块,iteration是由head tail组成,是一个比较特殊的stream 
task,目前还没有看到jm这边是如何对此进行处理的。这个iteration source其实是由其他source 
transform而来的,但是在dag图里这是作为一个source operator 存在的,不知道是否对于这个类型的operator 
taskid,barrier是否有特殊处理

---原始邮件---
发件人: "Congxian Qiu"https://zhuanlan.zhihu.com/p/87131964
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com> 于2020年8月21日周五 下午6:31写道:
>
> > Hello all,
> > 目前遇到一个问题,在iterative stream job
> > 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> > 测试state 
很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> > Exceeded checkpoint tolerable failure threshold.的报错
> >
> >
> > 配置如下:
> > env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, 
true);
> > CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > checkpointConfig.setCheckpointTimeout(60);
> > checkpointConfig.setMinPauseBetweenCheckpoints(6);
> > checkpointConfig.setMaxConcurrentCheckpoints(4);
> >
> >
> 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > checkpointConfig.setPreferCheckpointForRecovery(true);
> > checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > checkpointConfig.enableUnalignedCheckpoints();
> >
> >
> > 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?

flink checkpoint导致反压严重

2020-08-25 Thread zhanglachun
大佬们好,我一个flink任务,计算一分钟内的某项几项指标的中位数,总共5个指标,因为中位数计算需要全窗口数据排序,所以计算比较复杂,现在遇到的问题的是一旦开启checkpoint任务就从source端开始反压严重,但关闭checkpoint就正常运行.
目前优化的步骤有:
1.语义放弃exactlyonce 改到atleast 
2.分析发现keyby过程中有数据倾斜,已改成分布聚合,在第一轮聚合中key后添加随机数,在去除key后缀进行第二轮聚合
3.计算过程中使用RoaringBitmap作为中间数据缓存容器,最大限度减少内存损耗
4.增大并行度,提交时增大-yjm -ytm 内存配置
5.调整ck间隔时间
经过以上一些优化,任务性能确实有提高,绝大部分时间能正常运行,但每到业务高峰期(本公司业务最高峰在每天上午10点,这个点的数据量是平常的7~8倍),反压就立马非常严重,直至任务挂掉,但也没有oom之类的有效的错误日志输出

以上优化在各个版本都有测试,1.9,1.10,1.11都是一样的问题,总的来说的就是ck导致任务反压严重,不开启ck时,就算不经过上面的优化也正常运行,包括10点的业务高峰点

请大佬们给点优化思路,目前生产只能放弃ck,但这终归不是解决办法



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink checkpoint导致反压严重

2020-08-25 Thread 徐骁
input
  .keyBy()
  .timeWindow()
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
window 里面


Re: flink1.10中hive module 没有plus,greaterThan等函数

2020-08-25 Thread Chesnay Schepler

Moving this to the chinese user mailing  list.

On 25/08/2020 16:16, Andrey Zagrebin wrote:

Hi Faaron,

This mailing list is for support in English.
Could you translate your question into English?
You can also subscribe to the user mailing list in Chinese to get 
support in Chinese [1]


Best,
Andrey

[1] user-zh-subscr...@flink.apache.org 



On Fri, Aug 21, 2020 at 4:43 AM faaron zheng > wrote:


Hi all,

我在使用flink1.10的sql-client时候发现使用hive module时会缺少某些core
module 中的build-in function比如plus,greaterThan。这会导致同样的sql
core module可以执行成功,hive module却会报错,比如在使用row_number()
over()时候。这是什么原因?





flink1.11????????slot????

2020-08-25 Thread ??????
??Linux
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No 
pooled slot available and request to ResourceManager for new slot failed

Re: 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印

2020-08-25 Thread yang zhang
我知道了,我的查询sql条件的问题,已经改好了。
谢谢

发自我的iPhone

> 在 2020年8月25日,16:12,yang zhang  写道:
> 
> 从kafka读取数据转化为Pojo类型的数据流注册成表,新消息来了,注册表的查询结果不打印
> 
> 请问是不支持pojo流注册表吗?只能是Row类型吗?
> 
> 下面是相关代码
> 
> 
> 
>//1.创建执行环境
>StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>//全局参数设置
>streamEnv.getConfig().setGlobalJobParameters(parameters2);
> 
>//table env
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(streamEnv);
> 
> 
>//2.读取kafka | other source
> 
> //DataStream dataStream = null;
> 
> //if ("kafka".equalsIgnoreCase(sourceType)) {
> 
>//用jsonString反序列化
> 
> //dataStream = FlinkUtils.createKafkaStream(parameters2, 
> SimpleStringSchema.class);
> 
> //}
> 
>//###定义消费kafka source##
>Properties props = new Properties();
>//指定Ka fka的Broker地址
>props.setProperty("bootstrap.servers", 
> parameters2.getRequired("bootstrap.servers"));
>//指定组ID
>props.setProperty("group.id", parameters2.get("group.id"));
>//如果没有记录偏移量,第一次从最开始消费
> //props.setProperty("auto.offset.reset", 
> parameters.get("auto.offset.reset","earliest"));
>//kafka的消费者不自动提交偏移量
>props.setProperty("enable.auto.commit", 
> parameters2.get("enable.auto.commit","false"));
> 
>List topics = 
> Arrays.asList(parameters2.get("topics").split(","));
> 
> 
> 
>//new KafkaSource instance
>FlinkKafkaConsumer kafkaConsumer = new 
> FlinkKafkaConsumer(
>topics,
>SimpleStringSchema.class.newInstance(),
>props);
> 
>//得到kafka流
>DataStreamSource dataStream = 
> streamEnv.addSource(kafkaConsumer);
> 
>//3.映射为实体
>SingleOutputStreamOperator map = dataStream.map(new 
> Map2EntityFunction()).returns(Class.forName(sourceClass));
> 
>//4.注册一个实例获取column names
>Class clz = Class.forName(sourceClass);
>Object vo = clz.newInstance();
>StringBuilder columnBuilder = new StringBuilder();
>Field[] declaredFields = vo.getClass().getDeclaredFields();
>for (int i = 0; i < declaredFields.length; i++) {
>String fieldName = declaredFields[i].getName();
>columnBuilder.append(fieldName);
>if (i < declaredFields.length - 1) {
>columnBuilder.append(",");
>}
>}
> 
>String fieldsDeclare = columnBuilder.toString();
> 
>System.err.println(fieldsDeclare);
> 
>//5.注册数据表 --注意! 【这里的表名和字段名需要和待处理的执行表达式对应上,对应不上查询会报错】
>tEnv.registerDataStream(sourceName, map,fieldsDeclare);
> 
>//6.执行语句
>Table table = tEnv.sqlQuery(executiveSql);
> 
>//7.print
>tEnv.toAppendStream(table, Row.class).print();//运行时这里不会打印出结果
> 
> 
>//8.execute 
>streamEnv.execute(jobName);
>
>
>
>
> ---
>
> /**
> * 
> * 根据传入的映射类返回一个通用的POJO流
> */
> public class Map2EntityFunction extends RichMapFunction {
> 
> 
>@Override
>public T map(String s) throws Exception {
>System.err.println("receive kafka msg--->"+s); //每次收到消息这里会打印
>ParameterTool params = (ParameterTool) 
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>String sourceClass = params.getRequired("sourceClass");
>Preconditions.checkNotNull(sourceClass);
>Class clz = (Class) Class.forName(sourceClass);
>return JsonUtil.json2object(s, clz);
>}
> 
> 
> }
>
> ---
> 


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread Xintong Song
>
> 1. 怎么可以确认是leader丢失呢?
>
看下是否能找到类似 "ResourceManager xxx was revoked leadership" 的日志


> 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?

网络原因是一种可能。另外也可能是 HA service 有问题,要看你集群用的是什么 HA(比如ZooKeeper),排查下 HA 的服务状态是否正常。


> 3. 有什么办法可以恢复吗?

要根据具体原因才能知道如何恢复。
你现在是 yarn-session 上还有正在运行的作业吗?有可能试下停掉再重启 yarn-session 吗?
或者集群上是否有资源可以不停当前 session 再新起一个下看是否能正常工作?资源应该够的吧,这么长时间的话之前起的 TM 应该都释放了。


Thank you~

Xintong Song



On Tue, Aug 25, 2020 at 7:05 PM song wang  wrote:

> 你好,
> 现在yarn-session上还是可以提交新作业的。只是运行时无法分配slot,报错无法解析 resourcemanager 地址。
>
> 如果是RM leadership丢失的话,
> 1. 怎么可以确认是leader丢失呢?
> 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
> 3. 有什么办法可以恢复吗?
>
> Xintong Song  于2020年8月25日周二 下午5:26写道:
>
> > >
> > > 出现这个报错后就提交不了任务了
> > >
> > 我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?
> >
> > 从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
> > 如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。
> >
> > 我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader
> 产生。所以对于
> > JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server 的表现则是找不到新的
> > leader 无法提交新的作业。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Aug 25, 2020 at 4:50 PM song wang 
> > wrote:
> >
> > > hi, Xintong:
> > >
> > > 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
> > >
> > > 2020-08-22 05:39:24,473 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - The heartbeat of ResourceManager with id
> > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
> > > 2020-08-22 05:39:24,473 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Close ResourceManager connection
> > > 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager with
> > id
> > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
> > >
> > > 之后就一直报错 "Could not resolve ResourceManager address" 了,
> > > 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
> > > 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
> > > 可是resourcemanager地址是没有问题的。
> > >
> > > 请问有没有办法可以查看resourcemanager的健康状况呢?
> > >
> > >
> > > // 代码:
> > > package org.apache.flink.runtime.registration;
> > > public abstract class RetryingRegistration > > extends RpcGateway, S extends RegistrationResponse.Success> {
> > >public void startRegistration() {
> > >  ...
> > >  if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
> > > rpcGatewayFuture = (CompletableFuture)
> rpcService.connect(
> > >targetAddress,
> > >fencingToken,
> > >targetType.asSubclass(FencedRpcGateway.class));
> > >  } else {
> > > // 连接resourcemanager
> > > rpcGatewayFuture = rpcService.connect(targetAddress,
> > > targetType);
> > >  }
> > >  ...
> > >  rpcGatewayAcceptFuture.whenCompleteAsync(
> > > (Void v, Throwable failure) -> {
> > >if (failure != null && !canceled) {
> > >   final Throwable strippedFailure =
> > > ExceptionUtils.stripCompletionException(failure);
> > >   if (log.isDebugEnabled()) {
> > >  ...
> > >   } else {
> > >  // 报错
> > >  log.info(
> > > "Could not resolve {} address {}, retrying in
> {}
> > > ms:
> > >
> > >
> >
> {}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
> > >   }
> > >   // 重连
> > >
> > >
> > >
> >
> startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
> > >}
> > > },
> > > rpcService.getExecutor());
> > >}
> > > }
> > >
> > >
> > >
> > >
> >
> ==
> > > 以下是一次提交job的完成报错日志
> > >
> > >
> >
> ==
> > > 2020-08-25 16:02:55,737 INFO
> > >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> Received
> > > JobGraph submission 4a324bc1e1eeb964116686e568cea8ad (Streaming
> > WordCount).
> > > 2020-08-25 16:02:55,738 INFO
> > >  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> > Submitting
> > > job 4a324bc1e1eeb964116686e568cea8ad (Streaming WordCount).
> > > 2020-08-25 16:02:56,552 INFO
> > >  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> > > Added SubmittedJobGraph(4a324bc1e1eeb964116686e568cea8ad) to ZooKeeper.
> > > 2020-08-25 16:02:56,554 INFO
> > >  org.apache.flink.runtime.rpc.akka.AkkaRpcService  -
> Starting
> > > RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> > > akka://flink/user/jobmanager_42 .
> > > 2020-08-25 16:02:56,554 INFO
> > org.apache.flink.runtime.jobmaster.JobMaster
> > >  - Initializing job Streaming WordCount
> > > (4a324bc1e1eeb96411668

Re: flink1.11单机执行slot出错

2020-08-25 Thread Xintong Song
是单机运行 standalone 模式吗?感觉像是 TM 没起来。
jps 以下看看 TM 起来了没有,如果没起来的话找下 TM 日志看看具体原因是什么。

Thank you~

Xintong Song



On Wed, Aug 26, 2020 at 9:25 AM 小学生 <201782...@qq.com> wrote:

> 麻烦请教一个问题,在单机的Linux下运行,任务报这个错误,怎么解决呢?
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> No pooled slot available and request to ResourceManager for new slot failed


Re: flink checkpoint导致反压严重

2020-08-25 Thread LakeShen
Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁  于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy()
>   .timeWindow()
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-25 Thread wxpcc
了解了,非常感谢这么详细的解答。

提到的改动点和queryable state 我都去详细了解一下



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread song wang
1. 我在jobmanager日志中并没有找到相关的日志信息;
2. 用的是zk,看当时的日志有如下报错:
2020-08-22 05:38:30,974 INFO
org.apache.zookeeper.server.PrepRequestProcessor: Got user-level
KeeperException when processing sessionid:0x26ea6955ea90581 type:create
cxid:0x32311f1 zxid:0x2079a5014 txntype:-1 reqpath:n/a Error
Path:/flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
Error:KeeperErrorCode = NoNode for
/flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
不过当时的zk服务是正常的,应该是这个jobmanager有问题。

3. 重新起一个是没有问题的,不过有问题的这个yarn-session就恢复不了了。

Xintong Song  于2020年8月26日周三 上午9:39写道:

> >
> > 1. 怎么可以确认是leader丢失呢?
> >
> 看下是否能找到类似 "ResourceManager xxx was revoked leadership" 的日志
>
>
> > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
>
> 网络原因是一种可能。另外也可能是 HA service 有问题,要看你集群用的是什么 HA(比如ZooKeeper),排查下 HA
> 的服务状态是否正常。
>
>
> > 3. 有什么办法可以恢复吗?
>
> 要根据具体原因才能知道如何恢复。
> 你现在是 yarn-session 上还有正在运行的作业吗?有可能试下停掉再重启 yarn-session 吗?
> 或者集群上是否有资源可以不停当前 session 再新起一个下看是否能正常工作?资源应该够的吧,这么长时间的话之前起的 TM 应该都释放了。
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Aug 25, 2020 at 7:05 PM song wang 
> wrote:
>
> > 你好,
> > 现在yarn-session上还是可以提交新作业的。只是运行时无法分配slot,报错无法解析 resourcemanager 地址。
> >
> > 如果是RM leadership丢失的话,
> > 1. 怎么可以确认是leader丢失呢?
> > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
> > 3. 有什么办法可以恢复吗?
> >
> > Xintong Song  于2020年8月25日周二 下午5:26写道:
> >
> > > >
> > > > 出现这个报错后就提交不了任务了
> > > >
> > > 我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?
> > >
> > > 从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
> > > 如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。
> > >
> > > 我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader
> > 产生。所以对于
> > > JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server 的表现则是找不到新的
> > > leader 无法提交新的作业。
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Aug 25, 2020 at 4:50 PM song wang 
> > > wrote:
> > >
> > > > hi, Xintong:
> > > >
> > > > 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
> > > >
> > > > 2020-08-22 05:39:24,473 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - The heartbeat of ResourceManager with id
> > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
> > > > 2020-08-22 05:39:24,473 INFO
> > > org.apache.flink.runtime.jobmaster.JobMaster
> > > >  - Close ResourceManager connection
> > > > 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager
> with
> > > id
> > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
> > > >
> > > > 之后就一直报错 "Could not resolve ResourceManager address" 了,
> > > > 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
> > > > 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
> > > > 可是resourcemanager地址是没有问题的。
> > > >
> > > > 请问有没有办法可以查看resourcemanager的健康状况呢?
> > > >
> > > >
> > > > // 代码:
> > > > package org.apache.flink.runtime.registration;
> > > > public abstract class RetryingRegistration > > > extends RpcGateway, S extends RegistrationResponse.Success> {
> > > >public void startRegistration() {
> > > >  ...
> > > >  if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
> > > > rpcGatewayFuture = (CompletableFuture)
> > rpcService.connect(
> > > >targetAddress,
> > > >fencingToken,
> > > >targetType.asSubclass(FencedRpcGateway.class));
> > > >  } else {
> > > > // 连接resourcemanager
> > > > rpcGatewayFuture = rpcService.connect(targetAddress,
> > > > targetType);
> > > >  }
> > > >  ...
> > > >  rpcGatewayAcceptFuture.whenCompleteAsync(
> > > > (Void v, Throwable failure) -> {
> > > >if (failure != null && !canceled) {
> > > >   final Throwable strippedFailure =
> > > > ExceptionUtils.stripCompletionException(failure);
> > > >   if (log.isDebugEnabled()) {
> > > >  ...
> > > >   } else {
> > > >  // 报错
> > > >  log.info(
> > > > "Could not resolve {} address {}, retrying in
> > {}
> > > > ms:
> > > >
> > > >
> > >
> >
> {}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
> > > >   }
> > > >   // 重连
> > > >
> > > >
> > > >
> > >
> >
> startRegistrationLater(retryingRegistrationConfiguration.getErrorDelayMillis());
> > > >}
> > > > },
> > > > rpcService.getExecutor());
> > > >}
> > > > }
> > > >
> > > >
> > > >
> > > >
> > >
> >
> ==
> > > > 以下是一次提交job的完成报错日志
> > > >
> > > >
> > >
> >
> ==
> > > > 2020-08-25 16:02:55,737 INFO
> > > >  org

Re: 关于flink任务的日志收集到kafka,可以在logback配置文件中,加如每个job的id或者name吗?

2020-08-25 Thread Yang Wang
我建议可以通env的方式传,在logback或者log4j配置中直接引用相应的env

例如,可以通过如下两个配置传递clusterId到环境变量
containerized.master.env.clusterId=my-flink-cluster
containerized.taskmanager.env.clusterId=my-flink-cluster

另外,也有一些内置的环境变量可以来使用
_FLINK_CONTAINER_ID
_FLINK_NODE_ID


Best,
Yang

zilong xiao  于2020年8月25日周二 下午5:32写道:

> 1:想加入跟业务相关的字段,例如jobId,jobName,可以继承PatternLayout,重写doLayout即可,在方法中对日志进行填充
> 2:这些属性有办法可以从环境变量中获取
>
> Jim Chen  于2020年8月25日周二 下午4:49写道:
>
> > 大家好:
> > 我们在做flink的日志收集到kafak时,使用的logback日志配置文件,目前的pattern是%d{-MM-dd
> > HH:mm:ss.SSS} [%thread] %-5level %logger{60} -
> > %msg,有没有什么办法在里面加入每个job的id,name或者tasknamanger的主机名之类的信息啊。在做ELK的时候,方便查询。
> > 这个配置文件,是整个项目的,是基于Yarn的per job模式,难道每个主类打包的时候,都要改动不同的logbakc配置文件吗?
> >
>


Re: Flink-1.11.1 Application-Mode提交测试

2020-08-25 Thread Yang Wang
你给的这个报错目前并不能发现任何有效的信息,有可能的原因是在运行main的时候出错了
Application模式和perjob模式的很大的一个区别在于用户main运行的位置不一样

你把JobManager的log发一下,这样能看到更多详细的报错信息


Best,
Yang

amen...@163.com  于2020年8月25日周二 下午3:29写道:

> hi, everyone
>
> 当我把jar包都上传至hdfs时,使用如下命令进行application mode提交,
>
> ./bin/flink run-application -t yarn-application
> -Dyarn.provided.lib.dirs="hdfs:///user/flink/lib" -c
> com.yui.flink.demo.Kafka2Mysql hdfs:///user/flink/app_jars/kafka2mysql.jar
>
> 报异常如下:
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn Application Cluster
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
> at
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
> at
> org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by:
> org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
> YARN application unexpectedly switched to state FAILED during deployment.
> Diagnostics from YARN: Application application_1598223665550_0009 failed 1
> times (global limit =2; local limit is =1) due to AM Container for
> appattempt_1598223665550_0009_01 exited with  exitCode: -1
> Failing this attempt.Diagnostics: [2020-08-25 15:12:48.975]Destination
> must be relative
> For more detailed output, check the application tracking page:
> http://ck233:8088/cluster/app/application_1598223665550_0009 Then click
> on links to logs of each attempt.
> . Failing the application.
> If log aggregation is enabled on your cluster, use this command to further
> investigate the issue:
> yarn logs -applicationId application_1598223665550_0009
> at
> org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1021)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:524)
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:407)
> ... 9 more
>
> 其他没有任何的错误了,使用run -m yarn-cluster是可以正常提交的
>
> best,
> amenhub
>


flink1.11 kafka sql connector

2020-08-25 Thread Dream-底限
hi
我正在使用DDL语句创建kafka数据源,但是查看文档时候发现没有报漏参数指定消费者组的offset是否提交,请问这个默认情况下offset会不会提交到kafka分区


Re: flink on yarn 提交作业application模式的2个疑问

2020-08-25 Thread Yang Wang
1. 目前Flink是允许在一个user jar的main里面,运行多次execute的,包括executeAsyc。
每执行一次execute,就是提交了一个任务。所以你是可以这样来实现一个Application里面
运行多个job。但这里需要注意的是,目前只有非HA的Application模式可以支持多job
HA模式还不行,具体原因主要是恢复起来会很麻烦,还没有实现。

2. Flink会检查用户提交的user jar,然后从其中提取出来。具体可以看一下PackagedProgram#loadMainClass
的实现。


Best,
Yang

yang zhang  于2020年8月25日周二 下午1:50写道:

> 1.flink on yarn 的application模式怎么提交多个job组成应用程序呢?在官网和论坛的文章中这里没有详细展开。
>
> 与per-job 模式相比,Application
> 模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的
> execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。
>
> 怎么做到呢?
>
> 2.而且提交job时,没有指定运行main方法的类,那么在jobmanager是如何找到对应的执行main呢?
> 官网比如这个指令:
> ./bin/flink run-application -t yarn-application \
> -Djobmanager.memory.process.size=2048m \
> -Dtaskmanager.memory.process.size=4096m \ ./MyApplication.jar
>
>
>
> 发自我的iPhone


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread Xintong Song
这个报错看起来是 RM 和 RestServer 服务都是正常的,反倒是 JobMaster 没有拿到 leader,所以 RM 没有响应 JM
的请求。

你看下最早出现心跳超时 RM-JM 连接断开的时候,jobmanager 日志里面有没有 "Disconnect job manager xxx
for job xxx from the resource manager." 这样的信息,描述的是 RM 主动断开了与 JM 的连接。
另外,ZK 这个报错是只出现了一次,还是对每个提交后无法调度的作业都出现了?

Thank you~

Xintong Song



On Wed, Aug 26, 2020 at 10:12 AM song wang  wrote:

> 1. 我在jobmanager日志中并没有找到相关的日志信息;
> 2. 用的是zk,看当时的日志有如下报错:
> 2020-08-22 05:38:30,974 INFO
> org.apache.zookeeper.server.PrepRequestProcessor: Got user-level
> KeeperException when processing sessionid:0x26ea6955ea90581 type:create
> cxid:0x32311f1 zxid:0x2079a5014 txntype:-1 reqpath:n/a Error
>
> Path:/flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
> Error:KeeperErrorCode = NoNode for
>
> /flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
> 不过当时的zk服务是正常的,应该是这个jobmanager有问题。
>
> 3. 重新起一个是没有问题的,不过有问题的这个yarn-session就恢复不了了。
>
> Xintong Song  于2020年8月26日周三 上午9:39写道:
>
> > >
> > > 1. 怎么可以确认是leader丢失呢?
> > >
> > 看下是否能找到类似 "ResourceManager xxx was revoked leadership" 的日志
> >
> >
> > > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
> >
> > 网络原因是一种可能。另外也可能是 HA service 有问题,要看你集群用的是什么 HA(比如ZooKeeper),排查下 HA
> > 的服务状态是否正常。
> >
> >
> > > 3. 有什么办法可以恢复吗?
> >
> > 要根据具体原因才能知道如何恢复。
> > 你现在是 yarn-session 上还有正在运行的作业吗?有可能试下停掉再重启 yarn-session 吗?
> > 或者集群上是否有资源可以不停当前 session 再新起一个下看是否能正常工作?资源应该够的吧,这么长时间的话之前起的 TM 应该都释放了。
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Aug 25, 2020 at 7:05 PM song wang 
> > wrote:
> >
> > > 你好,
> > > 现在yarn-session上还是可以提交新作业的。只是运行时无法分配slot,报错无法解析 resourcemanager 地址。
> > >
> > > 如果是RM leadership丢失的话,
> > > 1. 怎么可以确认是leader丢失呢?
> > > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
> > > 3. 有什么办法可以恢复吗?
> > >
> > > Xintong Song  于2020年8月25日周二 下午5:26写道:
> > >
> > > > >
> > > > > 出现这个报错后就提交不了任务了
> > > > >
> > > > 我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?
> > > >
> > > > 从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
> > > > 如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。
> > > >
> > > > 我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader
> > > 产生。所以对于
> > > > JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server
> 的表现则是找不到新的
> > > > leader 无法提交新的作业。
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Aug 25, 2020 at 4:50 PM song wang 
> > > > wrote:
> > > >
> > > > > hi, Xintong:
> > > > >
> > > > > 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
> > > > >
> > > > > 2020-08-22 05:39:24,473 INFO
> > > > org.apache.flink.runtime.jobmaster.JobMaster
> > > > >  - The heartbeat of ResourceManager with id
> > > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
> > > > > 2020-08-22 05:39:24,473 INFO
> > > > org.apache.flink.runtime.jobmaster.JobMaster
> > > > >  - Close ResourceManager connection
> > > > > 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager
> > with
> > > > id
> > > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
> > > > >
> > > > > 之后就一直报错 "Could not resolve ResourceManager address" 了,
> > > > > 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
> > > > > 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
> > > > > 可是resourcemanager地址是没有问题的。
> > > > >
> > > > > 请问有没有办法可以查看resourcemanager的健康状况呢?
> > > > >
> > > > >
> > > > > // 代码:
> > > > > package org.apache.flink.runtime.registration;
> > > > > public abstract class RetryingRegistration G
> > > > > extends RpcGateway, S extends RegistrationResponse.Success> {
> > > > >public void startRegistration() {
> > > > >  ...
> > > > >  if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
> > > > > rpcGatewayFuture = (CompletableFuture)
> > > rpcService.connect(
> > > > >targetAddress,
> > > > >fencingToken,
> > > > >targetType.asSubclass(FencedRpcGateway.class));
> > > > >  } else {
> > > > > // 连接resourcemanager
> > > > > rpcGatewayFuture = rpcService.connect(targetAddress,
> > > > > targetType);
> > > > >  }
> > > > >  ...
> > > > >  rpcGatewayAcceptFuture.whenCompleteAsync(
> > > > > (Void v, Throwable failure) -> {
> > > > >if (failure != null && !canceled) {
> > > > >   final Throwable strippedFailure =
> > > > > ExceptionUtils.stripCompletionException(failure);
> > > > >   if (log.isDebugEnabled()) {
> > > > >  ...
> > > > >   } else {
> > > > >  // 报错
> > > > >  log.info(
> > > > > "Could not resolve {} address {}, retrying
> in
> > > {}
> > > > > ms:
> > > > >
> > > > >
> > > >
> > >
> >
> {}.",targetName,targetAddress,retryingRegistrationConfiguration.getErrorDelayMillis(),strippedFailure.getMessage());
> > > > >   }
> > > > >   

回复:请教一下flink链接hive的权限控制

2020-08-25 Thread faaron zheng
Hi Rui,感谢你的分享。我简单试了一下开启SQL Standard 
Authorization,没什么效果,不知道是我用的不对还是我们hive被定制过。此外,我发现在使用kerberos的情况下,可以通过hdfs的路径来控制权限,不过这种情况主要对外表比较有效。
 在2020年08月25日 21:34,Rui Li 写道: Hi, 
Authentication的话支持kerberos,应该正常做kinit就可以了。或者可以设置flink 
security相关的参数,如security.kerberos.login.keytab和security.kerberos.login.principal。具体可以参考:
 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems
 Authorization目前HiveCatalog这边没有做。如果你的HMS启用了authorization(比如hive自身的SQL standard 
authorization),那验证的动作应该发生在HMS端,对HiveCatalog也是生效的。 On Tue, Aug 25, 2020 at 4:48 
PM xiaoyan hua  wrote: > 我们当前用的是kerberos认证,需要额外配置什么么? 
xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由 > 网易邮箱大师 定制 在2020年08月25日 
15:54,faaron zheng 写道: Hi all, 我在使用flink > sql-client链接hive 
metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么? -- Best regards! Rui Li

Re: flink on yarn 提交作业application模式的2个疑问

2020-08-25 Thread yang zhang
感谢大佬回复,了解了。
那是不是相当于一个main方法
先写A逻辑,然后execute(),
后面写B逻辑,再execute(),
假如B逻辑必须等待A逻辑执行完毕才执行。
用这种方式提交直接就解决了这个场景,也不用第三方通知实现了。




发自我的iPhone

> 在 2020年8月26日,10:44,Yang Wang  写道:
> 
> 1. 目前Flink是允许在一个user jar的main里面,运行多次execute的,包括executeAsyc。
> 每执行一次execute,就是提交了一个任务。所以你是可以这样来实现一个Application里面
> 运行多个job。但这里需要注意的是,目前只有非HA的Application模式可以支持多job
> HA模式还不行,具体原因主要是恢复起来会很麻烦,还没有实现。
> 
> 2. Flink会检查用户提交的user jar,然后从其中提取出来。具体可以看一下PackagedProgram#loadMainClass
> 的实现。
> 
> 
> Best,
> Yang
> 
> yang zhang  于2020年8月25日周二 下午1:50写道:
> 
>> 1.flink on yarn 的application模式怎么提交多个job组成应用程序呢?在官网和论坛的文章中这里没有详细展开。
>> 
>> 与per-job 模式相比,Application
>> 模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的
>> execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。
>> 
>> 怎么做到呢?
>> 
>> 2.而且提交job时,没有指定运行main方法的类,那么在jobmanager是如何找到对应的执行main呢?
>> 官网比如这个指令:
>> ./bin/flink run-application -t yarn-application \
>> -Djobmanager.memory.process.size=2048m \
>> -Dtaskmanager.memory.process.size=4096m \ ./MyApplication.jar
>> 
>> 
>> 
>> 发自我的iPhone


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-25 Thread Yun Tang
Thanks for Zhu's work to manage this release and everyone who contributed to 
this!

Best,
Yun Tang

From: Yangze Guo 
Sent: Tuesday, August 25, 2020 14:47
To: Dian Fu 
Cc: Zhu Zhu ; dev ; user 
; user-zh 
Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks a lot for being the release manager Zhu Zhu!
Congrats to all others who have contributed to the release!

Best,
Yangze Guo

On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>
> Thanks ZhuZhu for managing this release and everyone else who contributed to 
> this release!
>
> Regards,
> Dian
>
> 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>
> Great news. Thanks a lot for being our release manager Zhu Zhu and to all 
> others who have contributed to the release!
>
> Cheers,
> Till
>
> On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10 
>> series.
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>> Thanks,
>> Zhu
>
>


Re: 报错 Could not resolve ResourceManager address akka.tcp://flink@hostname:16098/user/resourcemanager

2020-08-25 Thread Xintong Song
另外,可以看下 TM 日志。如果有正在运行的 TM,那么说明 RM 和 TM 之间的心跳是正常的,也就证明 RM 是没有问题的。如果没有 TM
在运行,有可能是因为长时间空闲没有任务运行被释放了,可以找最近被释放的 TM 的日志看下是因为心跳超时释放的,还是空闲超市 RM 主动释放的。


Thank you~

Xintong Song



On Wed, Aug 26, 2020 at 11:07 AM Xintong Song  wrote:

> 这个报错看起来是 RM 和 RestServer 服务都是正常的,反倒是 JobMaster 没有拿到 leader,所以 RM 没有响应 JM
> 的请求。
>
> 你看下最早出现心跳超时 RM-JM 连接断开的时候,jobmanager 日志里面有没有 "Disconnect job manager xxx
> for job xxx from the resource manager." 这样的信息,描述的是 RM 主动断开了与 JM 的连接。
> 另外,ZK 这个报错是只出现了一次,还是对每个提交后无法调度的作业都出现了?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Aug 26, 2020 at 10:12 AM song wang 
> wrote:
>
>> 1. 我在jobmanager日志中并没有找到相关的日志信息;
>> 2. 用的是zk,看当时的日志有如下报错:
>> 2020-08-22 05:38:30,974 INFO
>> org.apache.zookeeper.server.PrepRequestProcessor: Got user-level
>> KeeperException when processing sessionid:0x26ea6955ea90581 type:create
>> cxid:0x32311f1 zxid:0x2079a5014 txntype:-1 reqpath:n/a Error
>>
>> Path:/flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
>> Error:KeeperErrorCode = NoNode for
>>
>> /flink-bdp/application_1549925837808_10088957/leaderlatch/c05c969720b830770ffe0395aa37bf42/job_manager_lock
>> 不过当时的zk服务是正常的,应该是这个jobmanager有问题。
>>
>> 3. 重新起一个是没有问题的,不过有问题的这个yarn-session就恢复不了了。
>>
>> Xintong Song  于2020年8月26日周三 上午9:39写道:
>>
>> > >
>> > > 1. 怎么可以确认是leader丢失呢?
>> > >
>> > 看下是否能找到类似 "ResourceManager xxx was revoked leadership" 的日志
>> >
>> >
>> > > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
>> >
>> > 网络原因是一种可能。另外也可能是 HA service 有问题,要看你集群用的是什么 HA(比如ZooKeeper),排查下 HA
>> > 的服务状态是否正常。
>> >
>> >
>> > > 3. 有什么办法可以恢复吗?
>> >
>> > 要根据具体原因才能知道如何恢复。
>> > 你现在是 yarn-session 上还有正在运行的作业吗?有可能试下停掉再重启 yarn-session 吗?
>> > 或者集群上是否有资源可以不停当前 session 再新起一个下看是否能正常工作?资源应该够的吧,这么长时间的话之前起的 TM 应该都释放了。
>> >
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> > On Tue, Aug 25, 2020 at 7:05 PM song wang 
>> > wrote:
>> >
>> > > 你好,
>> > > 现在yarn-session上还是可以提交新作业的。只是运行时无法分配slot,报错无法解析 resourcemanager 地址。
>> > >
>> > > 如果是RM leadership丢失的话,
>> > > 1. 怎么可以确认是leader丢失呢?
>> > > 2. 通常是什么原因造成的呢?比如网络延迟?或者机器负载过高?
>> > > 3. 有什么办法可以恢复吗?
>> > >
>> > > Xintong Song  于2020年8月25日周二 下午5:26写道:
>> > >
>> > > > >
>> > > > > 出现这个报错后就提交不了任务了
>> > > > >
>> > > > 我确认一下,你之前这句话的意思,是出现这个报错之后,新的作业不能提交了,还是新的作业能提交但是提交之后页报这个找不到 RM 的错?
>> > > >
>> > > > 从 RM 心跳超时但是整个进程还在运行这个现象来看,比较符合 RM leadership 丢失的情况,这种情况下 RM 会停止服务。
>> > > > 如果是新的作业干脆就无法提交了,也符合 rest server leadership 丢失的情况。
>> > > >
>> > > > 我目前怀疑是 HA 出现问题,导致 RM 和 rest server 都认为自己不再是 leader,但是又迟迟没有新的 leader
>> > > 产生。所以对于
>> > > > JobMaster,由于没有发现有新的 RM leader,就会一直尝试重连原来的 RM,而对于 rest server
>> 的表现则是找不到新的
>> > > > leader 无法提交新的作业。
>> > > >
>> > > > Thank you~
>> > > >
>> > > > Xintong Song
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Aug 25, 2020 at 4:50 PM song wang > >
>> > > > wrote:
>> > > >
>> > > > > hi, Xintong:
>> > > > >
>> > > > > 我仔细查看了下日志,发现在报错"Could not resolve ResourceManager address"之前有如下日志:
>> > > > >
>> > > > > 2020-08-22 05:39:24,473 INFO
>> > > > org.apache.flink.runtime.jobmaster.JobMaster
>> > > > >  - The heartbeat of ResourceManager with id
>> > > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out.
>> > > > > 2020-08-22 05:39:24,473 INFO
>> > > > org.apache.flink.runtime.jobmaster.JobMaster
>> > > > >  - Close ResourceManager connection
>> > > > > 6724e1ef8ee1c5fe5212eec6182319b6: The heartbeat of ResourceManager
>> > with
>> > > > id
>> > > > > 6724e1ef8ee1c5fe5212eec6182319b6 timed out..
>> > > > >
>> > > > > 之后就一直报错 "Could not resolve ResourceManager address" 了,
>> > > > > 看了下flink 1.9.0 版本的代码,是在rpcService.connect() 时报的错,
>> > > > > 可是之后就没有日志输出了,单从报错信息来看只是说无法解析地址,
>> > > > > 可是resourcemanager地址是没有问题的。
>> > > > >
>> > > > > 请问有没有办法可以查看resourcemanager的健康状况呢?
>> > > > >
>> > > > >
>> > > > > // 代码:
>> > > > > package org.apache.flink.runtime.registration;
>> > > > > public abstract class RetryingRegistration> Serializable, G
>> > > > > extends RpcGateway, S extends RegistrationResponse.Success> {
>> > > > >public void startRegistration() {
>> > > > >  ...
>> > > > >  if (FencedRpcGateway.class.isAssignableFrom(targetType))
>> {
>> > > > > rpcGatewayFuture = (CompletableFuture)
>> > > rpcService.connect(
>> > > > >targetAddress,
>> > > > >fencingToken,
>> > > > >targetType.asSubclass(FencedRpcGateway.class));
>> > > > >  } else {
>> > > > > // 连接resourcemanager
>> > > > > rpcGatewayFuture = rpcService.connect(targetAddress,
>> > > > > targetType);
>> > > > >  }
>> > > > >  ...
>> > > > >  rpcGatewayAcceptFuture.whenCompleteAsync(
>> > > > > (Void v, Throwable failure) -> {
>> > > > >if (failure != null && !canceled) {
>> > > > >   final Throwable strippedFailure =
>> > > > > ExceptionUtils.stripCompletionException(failure);
>> > > > >   if (log.isDebugEnabled()) {
>> > > > >  ...
>> > > > >   

【闫云鹏】Flink cdc 连接mysql5.7.25报错

2020-08-25 Thread Yan,Yunpeng(DXM,PB)
Hi all:
   使用flink cdc连接mysql 5.7.25  使用默认的8.0.16的mysql驱动报错如下信息
   Caused by: java.sql.SQLNonTransientConnectionException: 
CLIENT_PLUGIN_AUTH is required
   at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
   at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
   at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
   at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
   at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
   at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:79)
   at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:835)
   at com.mysql.cj.jdbc.ConnectionImpl.(ConnectionImpl.java:455)
   at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:240)
   at 
com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:199)
   at 
io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:190)
   at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:788)
   at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:783)
   at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:329)
   at 
io.debezium.connector.mysql.MySqlJdbcContext.querySystemVariables(MySqlJdbcContext.java:325)
   ... 11 more
   数据库账户已授权,尝试降低驱动版本
   使用

mysql
mysql-connector-java
5.1.47

   报错
Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
   ... 8 more
[cid:image001.png@01D67B9B.6BAC4E50]
   在MySQLSource中手动添加驱动配置

props.setProperty("database.jdbc.driver", "com.mysql.jdbc.Driver");
不生效,还是使用默认的驱动

尝试修改


1.2.0.Final
发现依赖比较多,请求有没有更好的方式来兼容或者传入驱动
闫云鹏
DXM 支付业务部
地址:北京市海淀区西北旺东路度小满金融总部
邮编:100085
手机:13693668213
邮箱:yanyunp...@duxiaoman.com

度小满金融

精于科技 值得信赖






Re: flink checkpoint导致反压严重

2020-08-25 Thread Yun Tang
Hi

对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task 
同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
建议排查思路:

  1.  检查使用的state backend类型
  2.  检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时)
  3.  借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU

祝好
唐云

From: LakeShen 
Sent: Wednesday, August 26, 2020 10:00
To: user-zh 
Subject: Re: flink checkpoint导致反压严重

Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁  于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy()
>   .timeWindow()
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>


关于sink失败 不消费kafka消息的处理

2020-08-25 Thread 范超
大家好,我现在有个疑问
目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?


多谢大家了

范超


??????????????????checkpoint????

2020-08-25 Thread Robert.Zhang
Hi Congxian,


iteration source??barrier??
??barrier??operator??barrier??checkpoint??
??operator
??barrier??checkpoint??







checkpoint









------
??: "Congxian Qiu"https://zhuanlan.zhihu.com/p/87131964
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com> ??2020??8??21?? 6:31??
>
> > Hello all,
> > iterative stream job
> > 
checkpoint??checkpoint
> > state 
k??org.apache.flink.util.FlinkRuntimeException:
> > Exceeded checkpoint tolerable failure threshold.??
> >
> >
> > ??
> > env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, 
true);
> > CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > checkpointConfig.setCheckpointTimeout(60);
> > checkpointConfig.setMinPauseBetweenCheckpoints(6);
> > checkpointConfig.setMaxConcurrentCheckpoints(4);
> >
> >
> 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > checkpointConfig.setPreferCheckpointForRecovery(true);
> > checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > checkpointConfig.enableUnalignedCheckpoints();
> >
> >
> > ??

Re: 请教一下flink链接hive的权限控制

2020-08-25 Thread Rui Li
Hi,

不好意思,我查了一下hive文档发现SQL standard
authorization是在CLI/HS2端做的,那HiveCatalog目前没办法支持这种模式。HMS端可以用storage based
authorization,也就是你说的通过HDFS的ACL来控制权限。这种模式对外表和内表都是有效的,但管理起来一般比较繁琐,需要人工去设置路径的ACL。

On Wed, Aug 26, 2020 at 11:08 AM faaron zheng  wrote:

> Hi Rui,感谢你的分享。我简单试了一下开启SQL Standard
> Authorization,没什么效果,不知道是我用的不对还是我们hive被定制过。此外,我发现在使用kerberos的情况下,可以通过hdfs的路径来控制权限,不过这种情况主要对外表比较有效。
> 在2020年08月25日 21:34,Rui Li 写道: Hi,
> Authentication的话支持kerberos,应该正常做kinit就可以了。或者可以设置flink
> security相关的参数,如security.kerberos.login.keytab和security.kerberos.login.principal。具体可以参考:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#auth-with-external-systems
> Authorization目前HiveCatalog这边没有做。如果你的HMS启用了authorization(比如hive自身的SQL
> standard authorization),那验证的动作应该发生在HMS端,对HiveCatalog也是生效的。 On Tue, Aug 25,
> 2020 at 4:48 PM xiaoyan hua  wrote: >
> 我们当前用的是kerberos认证,需要额外配置什么么? xiaoyan hua 邮箱:xiaoyanhua...@gmail.com 签名由 >
> 网易邮箱大师 定制 在2020年08月25日 15:54,faaron zheng 写道: Hi all, 我在使用flink >
> sql-client链接hive metastore的时候,发现好像没有做任何权限控制,可以访问所有的表?这一块是没做么?有什么计划么? --
> Best regards! Rui Li



-- 
Best regards!
Rui Li


Re: flink on yarn 提交作业application模式的2个疑问

2020-08-25 Thread Yang Wang
是的,Application模式是可以这么使用的

但需要注意的是:
* 非HA模式下,如果JobManager failover了,会重新开始执行所有job。
* HA模式,还不能支持多个job


Best,
Yang

yang zhang  于2020年8月26日周三 上午11:17写道:

> 感谢大佬回复,了解了。
> 那是不是相当于一个main方法
> 先写A逻辑,然后execute(),
> 后面写B逻辑,再execute(),
> 假如B逻辑必须等待A逻辑执行完毕才执行。
> 用这种方式提交直接就解决了这个场景,也不用第三方通知实现了。
>
>
>
>
> 发自我的iPhone
>
> > 在 2020年8月26日,10:44,Yang Wang  写道:
> >
> > 1. 目前Flink是允许在一个user jar的main里面,运行多次execute的,包括executeAsyc。
> > 每执行一次execute,就是提交了一个任务。所以你是可以这样来实现一个Application里面
> > 运行多个job。但这里需要注意的是,目前只有非HA的Application模式可以支持多job
> > HA模式还不行,具体原因主要是恢复起来会很麻烦,还没有实现。
> >
> > 2. Flink会检查用户提交的user jar,然后从其中提取出来。具体可以看一下PackagedProgram#loadMainClass
> > 的实现。
> >
> >
> > Best,
> > Yang
> >
> > yang zhang  于2020年8月25日周二 下午1:50写道:
> >
> >> 1.flink on yarn 的application模式怎么提交多个job组成应用程序呢?在官网和论坛的文章中这里没有详细展开。
> >>
> >> 与per-job 模式相比,Application
> >> 模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的
> >>
> execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。
> >>
> >> 怎么做到呢?
> >>
> >> 2.而且提交job时,没有指定运行main方法的类,那么在jobmanager是如何找到对应的执行main呢?
> >> 官网比如这个指令:
> >> ./bin/flink run-application -t yarn-application \
> >> -Djobmanager.memory.process.size=2048m \
> >> -Dtaskmanager.memory.process.size=4096m \ ./MyApplication.jar
> >>
> >>
> >>
> >> 发自我的iPhone
>


Re: flink on yarn 提交作业application模式的2个疑问

2020-08-25 Thread yang zhang
好的,谢谢大佬!

发自我的iPhone

> 在 2020年8月26日,11:48,Yang Wang  写道:
> 
> 是的,Application模式是可以这么使用的
> 
> 但需要注意的是:
> * 非HA模式下,如果JobManager failover了,会重新开始执行所有job。
> * HA模式,还不能支持多个job
> 
> 
> Best,
> Yang
> 
> yang zhang  于2020年8月26日周三 上午11:17写道:
> 
>> 感谢大佬回复,了解了。
>> 那是不是相当于一个main方法
>> 先写A逻辑,然后execute(),
>> 后面写B逻辑,再execute(),
>> 假如B逻辑必须等待A逻辑执行完毕才执行。
>> 用这种方式提交直接就解决了这个场景,也不用第三方通知实现了。
>> 
>> 
>> 
>> 
>> 发自我的iPhone
>> 
 在 2020年8月26日,10:44,Yang Wang  写道:
>>> 
>>> 1. 目前Flink是允许在一个user jar的main里面,运行多次execute的,包括executeAsyc。
>>> 每执行一次execute,就是提交了一个任务。所以你是可以这样来实现一个Application里面
>>> 运行多个job。但这里需要注意的是,目前只有非HA的Application模式可以支持多job
>>> HA模式还不行,具体原因主要是恢复起来会很麻烦,还没有实现。
>>> 
>>> 2. Flink会检查用户提交的user jar,然后从其中提取出来。具体可以看一下PackagedProgram#loadMainClass
>>> 的实现。
>>> 
>>> 
>>> Best,
>>> Yang
>>> 
>>> yang zhang  于2020年8月25日周二 下午1:50写道:
>>> 
 1.flink on yarn 的application模式怎么提交多个job组成应用程序呢?在官网和论坛的文章中这里没有详细展开。
 
 与per-job 模式相比,Application
 模式允许提交由多个Job组成的应用程序。Job执行的顺序不受部署模式的影响,但受启动Job的调用的影响。使用阻塞的
 
>> execute()方法,将是一个顺序执行的效果,结果就是"下一个"Job的执行被推迟到“该”Job完成为止。相反,一旦提交当前作业,非阻塞executeAsync()方法将立即继续提交“下一个”Job。
 
 怎么做到呢?
 
 2.而且提交job时,没有指定运行main方法的类,那么在jobmanager是如何找到对应的执行main呢?
 官网比如这个指令:
 ./bin/flink run-application -t yarn-application \
 -Djobmanager.memory.process.size=2048m \
 -Dtaskmanager.memory.process.size=4096m \ ./MyApplication.jar
 
 
 
 发自我的iPhone
>> 


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-25 Thread Guowei Ma
Hi,

Thanks a lot for being the release manager Zhu Zhu!
Thanks everyone contributed to this!

Best,
Guowei


On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:

> Thanks for Zhu's work to manage this release and everyone who contributed
> to this!
>
> Best,
> Yun Tang
> 
> From: Yangze Guo 
> Sent: Tuesday, August 25, 2020 14:47
> To: Dian Fu 
> Cc: Zhu Zhu ; dev ; user <
> u...@flink.apache.org>; user-zh 
> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>
> Thanks a lot for being the release manager Zhu Zhu!
> Congrats to all others who have contributed to the release!
>
> Best,
> Yangze Guo
>
> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
> >
> > Thanks ZhuZhu for managing this release and everyone else who
> contributed to this release!
> >
> > Regards,
> > Dian
> >
> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
> >
> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
> all others who have contributed to the release!
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
> >>
> >> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink
> 1.10 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
> >>
> >> The full release notes are available in Jira:
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >>
> >> Thanks,
> >> Zhu
> >
> >
>


Re: 关于sink失败 不消费kafka消息的处理

2020-08-25 Thread Benchao Li
这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超  于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


-- 

Best,
Benchao Li


Re: ProcessWindowFunction为何在clear方法中无法清理状态-v1.10.1

2020-08-25 Thread shizk233
按我的理解,参考aggregate(AggregateFunction aggFunction,
ProcessWindowFunction windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。

x <35907...@qq.com> 于2020年8月25日周二 下午6:25写道:

>
> 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买如下env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)…
> .window(TumblingEventTimeWindows.of(Time.days(1)))
> .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
> .evictor(TimeEvictor.of(Time.seconds(0), true))
> .process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
> private var state: MapState[String,Boolean] = _
> override def open
> override def process
> override def clear(ctx: Context): Unit = {
> state.clear()
> }
> }


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-25 Thread Xingbo Huang
Thanks Zhu for the great work and everyone who contributed to this release!

Best,
Xingbo

Guowei Ma  于2020年8月26日周三 下午12:43写道:

> Hi,
>
> Thanks a lot for being the release manager Zhu Zhu!
> Thanks everyone contributed to this!
>
> Best,
> Guowei
>
>
> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>
>> Thanks for Zhu's work to manage this release and everyone who contributed
>> to this!
>>
>> Best,
>> Yun Tang
>> 
>> From: Yangze Guo 
>> Sent: Tuesday, August 25, 2020 14:47
>> To: Dian Fu 
>> Cc: Zhu Zhu ; dev ; user <
>> u...@flink.apache.org>; user-zh 
>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Congrats to all others who have contributed to the release!
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>> >
>> > Thanks ZhuZhu for managing this release and everyone else who
>> contributed to this release!
>> >
>> > Regards,
>> > Dian
>> >
>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>> >
>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>> all others who have contributed to the release!
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>> >>
>> >> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink
>> 1.10 series.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>> >>
>> >> The full release notes are available in Jira:
>> >>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
>> >>
>> >> We would like to thank all contributors of the Apache Flink community
>> who made this release possible!
>> >>
>> >> Thanks,
>> >> Zhu
>> >
>> >
>>
>


答复: 关于sink失败 不消费kafka消息的处理

2020-08-25 Thread 范超
感谢,目前也是通过打开checkpoint来改进的,待会测试一下看看是不是可以

-邮件原件-
发件人: Benchao Li [mailto:libenc...@apache.org] 
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh 
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超  于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


-- 

Best,
Benchao Li


Flink??????????????????????

2020-08-25 Thread Sun_yijia



??A??B??AB??
??B??ABA


??FlinkAB

答复: 关于sink失败 不消费kafka消息的处理

2020-08-25 Thread 范超
您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料

-邮件原件-
发件人: Benchao Li [mailto:libenc...@apache.org] 
发送时间: 2020年8月26日 星期三 12:59
收件人: user-zh 
主题: Re: 关于sink失败 不消费kafka消息的处理

这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。

范超  于2020年8月26日周三 上午11:38写道:

> 大家好,我现在有个疑问
> 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
>
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
>
>
> 多谢大家了
>
> 范超
>


-- 

Best,
Benchao Li