Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 范 佳兴
-C,--classpath  Adds a URL to each user code
  classloader  on all nodes in the
  cluster. The paths must specify a
  protocol (e.g. file://) and be
  accessible on all nodes (e.g. by means
  of a NFS share). You can use this
  option multiple times for specifying
  more than one URL. The protocol must
  be supported by the {@link
  java.net.URLClassLoader}.

-C指定依赖jar包需要放在URLClassLoader能够访问到的位置。



在 2021/4/19 下午10:22,“todd” 写入:

执行指令:flink  run   \
-m yarn-cluster \
-C file:////flink-demo-1.0.jar \
x

在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。



--
Sent from: http://apache-flink.147419.n8.nabble.com/




Re: flink1.11版本 -C 指令并未上传udf jar包

2021-04-20 文章 Qishang
Hi todd.
-C 不会上传对应路径下的 jar,最终会被添加到集群的 classpath 中,需要运行的机器对应的路径下要有同样的Jar包才可以。
可以放在私服或者oss的服务,通过 http 的方式加载的 udf jar
-C "http://host:port/xxx.jar";

希望能帮到你。


todd  于2021年4月19日周一 下午10:22写道:

> 执行指令:flink  run   \
> -m yarn-cluster \
> -C file:////flink-demo-1.0.jar \
> x
>
> 在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11版本 -C 指令并未上传udf jar包

2021-04-19 文章 todd
执行指令:flink  run   \
-m yarn-cluster \
-C file:////flink-demo-1.0.jar \
x

在Client端能够构建成功jobgraph,但是在yarn上会报UDF类找不到。我看Classpath中并未上传该JAR包。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
嗯,是这样的。

datayangl  于2021年3月19日周五 下午5:55写道:

> calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
> 将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 

Best,
Benchao Li


Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 datayangl
calcite解析将<> null 解析为unknown, 在flink优化阶段直接将unkown这个条件默认视为false,通过规则匹配
将整条sql优化为values(没有任何结果的sql),于是直接将程序的source task finish了。这个过程我理解的对吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 Benchao Li
Hi datayangl,

这是因为kafka_table.src_ip <>
null是恒等于false的,所以这个计算过程就被优化掉了,最后你的作业的逻辑就变成了一个单纯的values,里面没有一条数据。

关于为什么kafka_table.src_ip <> null,这个可以了解一下关于three-value-logic[1].
简单来说,在标准SQL里面,boolean类型是有三种值的,正常的= <>这种算子跟null比较的时候,结果都是unknown,
然后这个在filter条件里面会被视作false。

[1] https://modern-sql.com/concept/three-valued-logic

datayangl  于2021年3月19日周五 下午4:02写道:

> 环境:flink1.11:
> 代码如下:
> val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
> val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv
> val sql = """SELECT
>   CASE
> WHEN
>   kafka_table.log_type = 'detect'
>   AND
>   kafka_table.event_level = 3
> THEN 3
> ELSE 0
>   END as weight,
>   kafka_table.src_ip as kafka_table_src_ip_0,
>   kafka_table.dev_type as kafka_table_dev_type_0
> FROM
>   kafka_table
> WHERE
>   kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5
>   AND
>   kafka_table.src_ip <> null
>   AND
>   kafka_table.event_level > 0
>   AND
>   kafka_table.dev_type = 1
>
>
> val data:Table = tableEnv.sqlQuery(sql)
> val result = tableEnv.toRetractStream[Row](data)
> result.print(">")
> """
>
>
>
> 现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip
> is
> not null 可以正常运行并一直产生数据。
>
> 疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Flink1.11执行sql当判空使用<> null,程序直接结束

2021-03-19 文章 datayangl
环境:flink1.11:
代码如下:
val dataStreamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
val tableEnv: StreamTableEnvironment = FlinkUtils.streamTableEnv
val sql = """SELECT
  CASE
WHEN
  kafka_table.log_type = 'detect'
  AND
  kafka_table.event_level = 3
THEN 3
ELSE 0
  END as weight,
  kafka_table.src_ip as kafka_table_src_ip_0,
  kafka_table.dev_type as kafka_table_dev_type_0
FROM
  kafka_table
WHERE
  kafka_table.event_time >= unix_timestamp() - 60 * 60 * 5
  AND
  kafka_table.src_ip <> null
  AND
  kafka_table.event_level > 0
  AND 
  kafka_table.dev_type = 1


val data:Table = tableEnv.sqlQuery(sql)
val result = tableEnv.toRetractStream[Row](data)
result.print(">")
"""



现象:如果判空条件为kafka_table.src_ip <> null,则程序直接结束,没有任何报错,而使用kafka_table.src_ip is
not null 可以正常运行并一直产生数据。

疑问:我明白is not null是正确的用法,问题是用<> null 为什么程序会直接结束而且没有任何报错,感觉像是当作批处理去运行了。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计

2021-03-17 文章 HunterXHunter
GroupWindowAggregate不支持update或者delete的datasource。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.11如何实现Tumble Window后基于event time倒序取第一条作统计

2021-03-17 文章 Hush
Hi 大家好


现在想对5分钟的kafka数据开窗,因为是DTS同步消息数据,会有update 和 
delete,所以需要对相同user_id的数据根据事件时间倒序第一条,统计最后一次status(状态字段)共有多少人。


marketingMapDS: DataStream[(String, String, Long)]
|
tEnv.createTemporaryView("test", marketingMapDS,$"status", $"upd_user_id", 
$"upd_time".rowtime)
val resultSQL =
  """
|SELECT t.status,
| COUNT(t.upd_user_id) as num
|FROM (
|SELECT  *,
|  ROW_NUMBER() OVER (PARTITION BY upd_user_id ORDER BY 
upd_time DESC) as row_num
|FROM test
|) t
|WHERE t.row_num = 1
|GROUP BY t.status, TUMBLE(t.upd_time, INTERVAL '5' MINUTE)
|""".stripMargin
val table2 = tEnv.sqlQuery(resultSQL)
val resultDS = tEnv.toRetractStream[Row](table2)
|


这样写后会报以下错:
| Exception in thread "main" org.apache.flink.table.api.TableException: 
GroupWindowAggregate doesn't support consuming update and delete changes which 
is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[upd_user_id], 
orderBy=[upd_time DESC], select=[status, upd_user_id, upd_time]) |


所以想实现该需求,请问还可以怎么实现。。。


TABLE API 可以实现 类似 ROW_NUMBER() OVER 这样功能吗?
|
 val table = tEnv.fromDataStream(marketingMapDS, $"status", $"upd_user_id", 
$"upd_time".rowtime)
  .window(Tumble over 5.millis on $"upd_time" as "w")
  .groupBy($"w")
???
|


Flink新手一个。。。请大佬指点~



??????????Flink1.11??flink-runtime-web????

2021-03-03 文章 Natasha
hi Michael,
    ??




--  --
??: 
   "user-zh"



Re:回复:编译Flink1.11的flink-runtime-web失败

2021-03-02 文章 Michael Ran



flink-test-utils-junit 单独编译下。 缺什么编译什么就行














在 2021-03-03 10:57:27,"Natasha"  写道:
>hi Michael,
> 我拉取flink 1.11 realse分支后,可以看到flink-runtime-web中的版本就是1.11-SNAPSHOT。
>
>
>
>
>
>-- 原始邮件 --
>发件人:   
> "user-zh" 
>   
>发送时间: 2021年3月3日(星期三) 上午10:50
>收件人: "user-zh"
>主题: Re:编译Flink1.11的flink-runtime-web失败
>
>
>
>为什么还会依赖 -SNAPSHOT 的jar。不是release 的 的版本吗?
>
>
>
>在 2021-03-03 10:34:23,"Natasha" 
>hi,all
>我在编译Flink1.11,由于每次到flink-runtime-web都失败,于是我cd flink-runtime-web进行单独编译,发现
>Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT,
>Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT
>依赖一直无法下载下来。请问有好的解决方法吗?
>    感谢你们在百忙之中看到我的邮件!


??????????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi Michael,
 ??flink 1.11 
realseflink-runtime-web1.11-SNAPSHOT??





--  --
??: 
   "user-zh"



??????????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi Michael,
 ??flink 1.11 
realseflink-runtime-web1.11-SNAPSHOT??







--  --
??: 
   "user-zh"



Re:????Flink1.11??flink-runtime-web????

2021-03-02 文章 Michael Ran
?? -SNAPSHOT ??jar??release ?? ??



?? 2021-03-03 10:34:23??"Natasha"  ??

hi??all
????Flink1.11flink-runtime-web??cd 
flink-runtime-web??
Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT??
Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT



????Flink1.11??flink-runtime-web????

2021-03-02 文章 Natasha
hi??all
Flink1.11flink-runtime-web??cd flink-runtime-web??
Cannot resolve org.apache.flink:flink-test-utils-junit:1.11-SNAPSHOT??
Cannot resolve org.apache.flink:flink-test-utils_2.11:1.11-SNAPSHOT

    

Re: flink1.11的Streaming File Sink问题

2021-02-23 文章 Robin Zhang
Hi, op
   
flink内部可以实现exactly-once语义,但是写到hdfs是至少一次的语义,如果任务失败重新启动会发生数据重复的问题,所以需要自己增加逻辑处理。

Best,
Robin


op wrote
> 大家好:
>     我想知道flink1.11的Streaming File
> Sink保存流数据到hdfs支持exactly-once语义吗,官网好像没说,谢谢!





--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11??Streaming File Sink????

2021-02-22 文章 op

    flink1.11??Streaming File 
Sinkhdfsexactly-once

Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句



 
发件人: datayangl
发送时间: 2021-01-14 16:13
收件人: user-zh
主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
FlinkUtils.initTable()
val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
streamEnv.disableOperatorChaining()
streamEnv.setParallelism(1)
streamEnv.setMaxParallelism(1)
CheckPointUtils.setCheckPoint(streamEnv, 12, 6)
dealWithOdsDataTohive(tableEnv)
val sqls:Map[String,String] = ConfigItem.ODS_SQL
 
val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet
 
val filledAllSqlsTable = sqls.map(x=>{
  val hiveMapTopic = hiveTableMapTopic
  val topicName = hiveMapTopic.getOrElse(x._1,null)
  val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
  (x._1,topic,x._2)
}).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
  val sql = fillTemplate(x._1,x._2,x._3)
  tableEnv.executeSql(sql)
  x._1
})
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
val stmtSet = tableEnv.createStatementSet()
val allInsertSqls = filledAllSqlsTable.map(table=>{
  s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
}).toList
allInsertSqls.foreach(x=>{
  stmtSet.addInsertSql(x)
})
val insertTaskStatus = stmtSet.execute()
//insertTaskStatus.print()
println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"
 
filled
  }
 
执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)
 
报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。
 
 
参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 datayangl
flink版本: 1.11
使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka
代码如下:
  def main(args: Array[String]): Unit = {
FlinkUtils.initTable()
val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv
val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv
streamEnv.disableOperatorChaining()
streamEnv.setParallelism(1)
streamEnv.setMaxParallelism(1)
CheckPointUtils.setCheckPoint(streamEnv, 12, 6)
dealWithOdsDataTohive(tableEnv)
 val sqls:Map[String,String] = ConfigItem.ODS_SQL

val ODS_TOPIC_SWITCH_ON = ConfigItem.APP_SOURCES.getOrElse("ODS2HIVE",
null).map(x => DictClass.logTypeAndTopic.getOrElse(x, "")).toSet

val filledAllSqlsTable = sqls.map(x=>{
  val hiveMapTopic = hiveTableMapTopic
  val topicName = hiveMapTopic.getOrElse(x._1,null)
  val topic = if(ODS_TOPIC_SWITCH_ON.contains(topicName)) topicName else
null
  (x._1,topic,x._2)
}).filter(x=>StringUtils.isNotEmpty(x._2)).map(x=>{
  val sql = fillTemplate(x._1,x._2,x._3)
  tableEnv.executeSql(sql)
  x._1
})
HiveUtils.initHiveCatalog("tsgz","catalogName", tableEnv)
val stmtSet = tableEnv.createStatementSet()
val allInsertSqls = filledAllSqlsTable.map(table=>{
  s"insert into tsgz.${table} select * from
default_catalog.default_database.${table}"
}).toList
allInsertSqls.foreach(x=>{
  stmtSet.addInsertSql(x)
})
val insertTaskStatus = stmtSet.execute()
//insertTaskStatus.print()
println(insertTaskStatus.getJobClient.get().getJobStatus())
}
  /**
   * 填充kafka sql映射表的模板内容
   * */
  def fillTemplate(tableName:String, topicName:String, fields:String)={
val kafkaHost = ConfigItem.KAFKA_BOOTSTRAP_SERVERS
val filled = s"create table ${tableName} (${fields}) with ('connector' =
'kafka','topic' = '${topicName}','properties.bootstrap.servers' =
'${kafkaHost}','properties.group.id' = 'OdsDataToHive1','format' =
'json','scan.startup.mode' = 'latest-offset')"

filled
  }

执行后报错
Exception in thread "main" java.lang.IllegalStateException: No operators
defined in streaming topology. Cannot generate StreamGraph.
at
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:97)
at
com.etl.chaitin.main.OdsDataToHive$.dealWithOdsDataTohive(OdsDataToHive.scala:54)
at com.etl.chaitin.main.OdsDataToHive$.main(OdsDataToHive.scala:21)
at com.etl.chaitin.main.OdsDataToHive.main(OdsDataToHive.scala)

报错位置为 val insertTaskStatus = stmtSet.execute() 这一行。


参考资料:https://www.bookstack.cn/read/flink-1.11.1-zh/dc487098ce87ed44.md




--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11 mysql cdc checkpoint 失败后程序自动恢复,同步数据出现重复

2021-01-03 文章 lingchanhu
sourcr:mysql-cdc
sink:elasticsearch

问题描述:
从mysql中同步表数据至elasticsearch后,进行新增再删除的某条数据出现问题,导致sink失败(没加primary
key)。checkpoint失败,程序自动恢复重启后,checkpoint 成功,但是elasticsearch 中的数据是mysql
表中的两倍,出现重复同步情况。
程序的自动恢复不应该是从当前checkpoint 中记录的binlog 位置再同步么?为什么会再重头同步一次呢?
(ddl 中写死了server-id,
"  'table-name' = '"+ table +"'," +
"  'server-id' = '"+ serverId +"'" + )


日志:






--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 flinksql 滑动窗口问题反馈

2020-12-29 文章 fan_future

 
需求:每隔5分钟输出凌晨到当前的数据量

方案:使用滑动窗口,步长为5,通过where条件过滤出今天的数据进行count(1)

现象:00:05凌晨5分的时候窗口触发,发现累加值并不是今天的数据量,而是在当前窗口内的所有数据,不知道是不是bug,或者是我这边条件使用不正确,还麻烦社区帮忙解决下

理解:咨询了一下,好像这个where条件是在数据源端触发的,也就是数据进来的时候,符合条件的数据才流到窗口内,而不是在窗口内触发where条件过滤的,不知道有什么方法可以在窗口内触发filter不



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 Yang Wang
非常不建议你将非Flink binary的jar存放到yarn.provided.lib.dirs,因为这个下面的jar会以Yarn public
distributed cache的方式进行分发
并在NodeManager上缓存,共享给所有的application使用

你这个报错的根本原因是本地运行main的时候udf还是在hdfs上,所以报错在client端了

有两个办法修复:
1. 不要将udf放到hdfs上的provided lib dirs,除非你确实想将它共享给很多application
2.
使用application模式[1],这种情况用户的main是在JobManager端运行的,provided下面的jar已经都下载并且加入classpath了

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

zhisheng  于2020年12月25日周五 上午11:26写道:

> hi
>
> 使用 -Dyarn.provided.lib.dirs 试试
>
> Best
> zhisheng
>
> datayangl  于2020年12月22日周二 下午4:56写道:
>
> >
> >
> > flink1.11 on yarn模式,我提前将flink
> > lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> > 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
> >
> > 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test
> -yD
> > yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> > com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
> >
> > 相关信息如下:
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > 2020-12-22 08:41:11,157 INFO
> > org.apache.flink.yarn.cli.FlinkYarnSessionCli
> > [] - Dynamic Property set:
> > yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> > -- class path:
> /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> >
> > 
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: object com.ly.third.udf.flink.SortKey not found.
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at
> > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> > at
> >
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at
> > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: scala.ScalaReflectionException: object
> > com.ly.third.udf.flink.SortKey not found.
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> > at
> > scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> > at
> >
> >
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> > at
> com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> > at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 datayangl
用-D 还是加载不了,难道yarn.provided.lib.dirs只有application mode支持???
我看阿里云有yarn-cluster的例子:
https://developer.aliyun.com/article/762501?spm=a2c6h.12873639.0.0.14ac3a9eM6GNSi

  

目前可以用-C加载本地自定义函数jar,但是需要所有节点都有指定的jar,但是这不是我想要的效果。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-24 文章 zhisheng
hi

使用 -Dyarn.provided.lib.dirs 试试

Best
zhisheng

datayangl  于2020年12月22日周二 下午4:56写道:

>
>
> flink1.11 on yarn模式,我提前将flink
> lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
> 指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径
>
> 提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test -yD
> yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
> com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar
>
> 相关信息如下:
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> 2020-12-22 08:41:11,157 INFO
> org.apache.flink.yarn.cli.FlinkYarnSessionCli
> [] - Dynamic Property set:
> yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
> -- class path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: object com.ly.third.udf.flink.SortKey not found.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: scala.ScalaReflectionException: object
> com.ly.third.udf.flink.SortKey not found.
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
> at
> scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
> at
>
> com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
> at com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
> at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


yarn.provided.lib.dirs在flink1.11 yarn提交不生效

2020-12-22 文章 datayangl



flink1.11 on yarn模式,我提前将flink
lib下的依赖及自定义函数jar上传到hdfs上,提交时使用yarn.provided.lib.dirs
指定hdfs的依赖路径。原本设想程序中使用反射去寻找自定义函数的类并且实例化,但是提交时报错,程序并没有找到自定义函数的路径

提交命令:/usr/hdp/flink1.11/bin/flink run -m yarn-cluster -d -ynm udf-test -yD
yarn.provided.lib.dirs=hdfs://ip:8020/flink-yarn/jars -c
com.ly.common.udf.demo.FlinkUDFDemo  /data/bigdata/jars/udf-test.jar

相关信息如下:
2020-12-22 08:41:11,157 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
  
[] - Dynamic Property set:
yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
2020-12-22 08:41:11,157 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
  
[] - Dynamic Property set:
yarn.provided.lib.dirs=hdfs://chaitin/flink-yarn/jars
-- class path: /usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: object com.ly.third.udf.flink.SortKey not found.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: scala.ScalaReflectionException: object
com.ly.third.udf.flink.SortKey not found.
at 
scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
at 
scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
at
com.ly.common.udf.reflect.RegisterFlinkFunction$.loadFlinkFunction(RegisterFlinkFunction.scala:14)
at com.ly.common.udf.demo.FlinkUDFDemo$.main(FlinkUDFDemo.scala:27)
at com.ly.common.udf.demo.FlinkUDFDemo.main(FlinkUDFDemo.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Wed, Dec 16, 2020 at 11:34 AM 李世钰  wrote:
>
> flink1.11  datastream  elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
> elasticsearch7.0
>
>
>
>
>
>
> --
>
> --
>
> 李世钰
>
> Mail:m...@lishiyu.cn
>
> TEL:18801236165
>
> Motto:让身边的人快乐,你的身边就充满快乐!
>
>
>
>
>
>
>  
>
>
>
>  


flink1.11 datastream elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?

2020-12-15 文章 李世钰
flink1.11  datastream  elasticsearch sink 写入es需要账号密码验证,目前支持这种操作吗?
elasticsearch7.0






--

--

李世钰

Mail:m...@lishiyu.cn

TEL:18801236165

Motto:让身边的人快乐,你的身边就充满快乐!






 

 

 

Re:Re: flink1.11编译失败

2020-11-26 文章 hailongwang
Hi,
  这个错误应该是找到了老的这个类的 .class 文件。
  你可以编译下整个工程"mvn clean install -DskipTests"。


Best,
Hailong

在 2020-11-27 14:20:42,"zhy"  写道:
>hi、
>感谢,确实是这个问题,我现在跑测试用例遇到下面问题,请问什么原因哪,我看文件是存在的
>Error:(32, 25) object contrib is not a member of package org.apache.flink
>import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
>
>Leonard Xu  于2020年11月26日周四 下午7:58写道:
>
>> HI
>>
>> 这两个类是 codegen 生成的,所以源码里没有,你编译下flink-sql-parser模块就会自动生成这几个类。
>>
>> 祝好,
>> Leonard
>>
>> > 在 2020年11月26日,19:43,zhy  写道:
>> >
>> > hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~
>> >
>> >
>> > import org.apache.flink.sql.parser.impl.ParseException;
>> >
>> > import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
>> >
>> > Error:(39, 87) java: 找不到符号
>> >  符号:   类 ParseException
>> >  位置: 类
>> > org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn
>>
>>


Re: flink1.11编译失败

2020-11-26 文章 zhy
hi、
感谢,确实是这个问题,我现在跑测试用例遇到下面问题,请问什么原因哪,我看文件是存在的
Error:(32, 25) object contrib is not a member of package org.apache.flink
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend

Leonard Xu  于2020年11月26日周四 下午7:58写道:

> HI
>
> 这两个类是 codegen 生成的,所以源码里没有,你编译下flink-sql-parser模块就会自动生成这几个类。
>
> 祝好,
> Leonard
>
> > 在 2020年11月26日,19:43,zhy  写道:
> >
> > hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~
> >
> >
> > import org.apache.flink.sql.parser.impl.ParseException;
> >
> > import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
> >
> > Error:(39, 87) java: 找不到符号
> >  符号:   类 ParseException
> >  位置: 类
> > org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn
>
>


Re: flink1.11编译失败

2020-11-26 文章 Leonard Xu
HI

这两个类是 codegen 生成的,所以源码里没有,你编译下flink-sql-parser模块就会自动生成这几个类。

祝好,
Leonard

> 在 2020年11月26日,19:43,zhy  写道:
> 
> hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~
> 
> 
> import org.apache.flink.sql.parser.impl.ParseException;
> 
> import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
> 
> Error:(39, 87) java: 找不到符号
>  符号:   类 ParseException
>  位置: 类
> org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn



flink1.11编译失败

2020-11-26 文章 zhy
hi、flink1.11 release source编译为什么会缺失类文件,去github仓库也没找到,如何解决这个问题~


import org.apache.flink.sql.parser.impl.ParseException;

import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;

Error:(39, 87) java: 找不到符号
  符号:   类 ParseException
  位置: 类
org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveTableAddReplaceColumn


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 lingchanhu
感谢,已经解决了!

BR,
lingchanhu



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。
然后就可以在这个上面调用 registerFunction 了。

On Wed, 18 Nov 2020 at 10:40, lingchanhu  wrote:

> 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 lingchanhu
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 lingchanhu
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log () with
('connect.type'='jdbc'),");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics () with
('connect.type'='filesystem','format.type'='csv',.)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
   
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
.select($("from_user"),$("first_send_msg_today"))
.executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction {

public void accumulate(CountDTO acc, LocalDateTime createTime) {
if (acc.getDateTime() == null) {
acc.setDateTime(createTime);
} else if (acc.getDateTime().isAfter(createTime)) {
acc.setDateTime(createTime);
}
}

@Override
public LocalDateTime getValue(CountDTO acc) {
return acc.getDateTime();
}

@Override
public CountDTO createAccumulator() {
return new CountDTO();
}
}

// accumulate pojo 类
public class 

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。

On Wed, 18 Nov 2020 at 10:34, Jark Wu  wrote:

> 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
> AggregateFunction。
> 你说 StreamTableEnvironment 可以,我估计你用的是
> StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
>
> Best,
> Jark
>
>
> On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:
>
>> *flink1.11*
>> 在TableEnvironment环境中注册并使用自定义的Aggregate
>> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
>> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>>
>> org.apache.flink.table.api.TableException: Aggregate functions are not
>> updated to the new type system yet.
>> at
>>
>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>> at
>> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>> at
>>
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>>
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>> at java.util.function.Function.lambda$andThen$1(Function.java:88)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>> at
>>
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>> at
>>
>> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>>
>> *// 以下是代码*
>> // main
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inBatchMode()
>> .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>>
>> // 注册source table, jdbc table source
>> tEnv.executeSql("CREATE TABLE wx_event_log () with
>> ('connect.type'='jdbc'),");
>>
>> // 注册sink table,csv table sink
>> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
>&g

Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 Jark Wu
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。
你说 StreamTableEnvironment 可以,我估计你用的是
StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。

Best,
Jark


On Wed, 18 Nov 2020 at 09:49, lingchanhu  wrote:

> *flink1.11*
> 在TableEnvironment环境中注册并使用自定义的Aggregate
> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
> at
>
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
> at
>
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
> at
>
> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>
> *// 以下是代码*
> // main
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>
> // 注册source table, jdbc table source
> tEnv.executeSql("CREATE TABLE wx_event_log () with
> ('connect.type'='jdbc'),");
>
> // 注册sink table,csv table sink
> tEnv.executeSql("CREATE TABLE wx_data_statistics () with
> ('connect.type'='filesystem','format.type'='csv',.)");
>
> // 注册agg function
> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
> FirstSendMsgFunc());
>
> Table table2 = tEnv.sqlQuery("select from_user,create_time from
> wx_event_log
> where msg_type='text' and create_time between '2020-03-20' and
> '2020-03-21'");
>
> table2.groupBy($("from_user"))
>
>
> .aggregate(call("firSendMsgFunc",$("crea

flink1.11 TableEnvironment 不支持注册 Aggregate Function?

2020-11-17 文章 lingchanhu
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)

org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)

*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();

TableEnvironment tEnv = TableEnvironment.create(envSettings);

// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log () with
('connect.type'='jdbc'),");

// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics () with
('connect.type'='filesystem','format.type'='csv',.)");

// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());

Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");

table2.groupBy($("from_user"))
   
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
.select($("from_user"),$("first_send_msg_today"))
.executeInsert("wx_data_statistics");


// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction {

public void accumulate(CountDTO acc, LocalDateTime createTime) {
if (acc.getDateTime() == null) {
acc.setDateTime(createTime);
} else if (acc.getDateTime().isAfter(createTime)) {
acc.setDateTime(createTime);
}
}

@Override
public LocalDateTime getValue(CountDTO acc) {
return acc.getDateTime();
}

@Override
public CountDTO createAccumulator() {
return new CountDTO();
}
}

// accumulate pojo 类
public class 

Re: flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

2020-11-14 文章 Jark Wu
重复的问题。我将刚刚的回答也贴在这里。

如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

Best,
Jark

On Sat, 14 Nov 2020 at 16:35, 李世钰  wrote:

> 您好,请教您一个问题
> flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
> create table kafka_table (
> `log_id`  string,
> event_date timestamp(3),
> process_time as PROCTIME(),
> ts as event_date,
> watermark for ts as ts - interval '1' second
> ) with (
>  'connector' = 'kafka',
>  'topic' = 'kafka_table',
>  'properties.bootstrap.servers' = '10.2.12.3:9092',
>  'properties.group.id' = 'tmp-log-consumer003',
>  'format' = 'json',
>  'scan.startup.mode' = 'latest-offset'
> )
> 执行的sql是
> select TUMBLE_START(kafka_table.event_date, INTERVAL '10'
> SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10'
> SECOND),src_ip,count(dest_ip) from kafka_table group by
> TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip
>
>
>
>
> select log_id,process_time,ts from kafka_table查询的表结构如下
> 表结构为
> root
>  |-- log_id: STRING
>  |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
>  |-- ts: TIMESTAMP(3) *ROWTIME*
>
>
>  输入数据为
> log_id,process_time,ts
> 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
> 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
> 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
> 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
> 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
> 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806


flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

2020-11-14 文章 李世钰
您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
create table kafka_table (
`log_id`  string,
event_date timestamp(3),
process_time as PROCTIME(),
ts as event_date,
watermark for ts as ts - interval '1' second
) with (
 'connector' = 'kafka',
 'topic' = 'kafka_table',
 'properties.bootstrap.servers' = '10.2.12.3:9092',
 'properties.group.id' = 'tmp-log-consumer003',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
)
执行的sql是
select TUMBLE_START(kafka_table.event_date, INTERVAL '10' 
SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' 
SECOND),src_ip,count(dest_ip) from kafka_table group by 
TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip




select log_id,process_time,ts from kafka_table查询的表结构如下
表结构为
root
 |-- log_id: STRING
 |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
 |-- ts: TIMESTAMP(3) *ROWTIME*


 输入数据为
log_id,process_time,ts
13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806

Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-11 文章 Jark Wu
我估计你是用的 confluent schema registry 的 avro。
可以使用下在 master 分支提供的 avro-confluent format [1]。 需要自己 build 下源码。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/avro-confluent.html

On Wed, 11 Nov 2020 at 14:20, 奔跑的小飞袁  wrote:

> 这是我尝试输出的message长度
> message length is: 529
> message length is: 212
> message length is: 391
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 文章 奔跑的小飞袁
这是我尝试输出的message长度
message length is: 529
message length is: 212
message length is: 391




--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 文章 Shuai Xia
Hi,可以试下输出下message的长度么?


--
发件人:奔跑的小飞袁 
发送时间:2020年11月11日(星期三) 11:40
收件人:user-zh 
主 题:flink1.11 读取kafka avro格式数据发序列化失败

hello  我在使用flink1.11版本读取kafka
avro格式数据时遇到了错误,由于我们的avro特殊,因此源码稍微作了修改,以下是改动的代码片段
@Override
 public T deserialize(byte[] message) throws IOException {
  // read record
  checkAvroInitialized();
  inputStream.setBuffer(message);
  inputStream.skip(5);
  Schema readerSchema = getReaderSchema();
  GenericDatumReader datumReader = getDatumReader();
  datumReader.setSchema(readerSchema);
  return datumReader.read(null, decoder);
 }
源码包为:org.apache.flink.formats.avro.AvroDeserializationSchema

相同的改动在1.9.0是可以正常工作,我想知道在读取avro格式的数据这块社区是有过什么改动吗

以下是错误信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
 at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
 at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
 at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
 at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
 at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
 at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
 at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
 at
com.intsig.flink.streaming.streaming_project.abtest.GenericRecordTest.main(GenericRecordTest.java:54)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
 at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
 at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
 at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
 at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at

flink1.11 读取kafka avro格式数据发序列化失败

2020-11-10 文章 奔跑的小飞袁
hello  我在使用flink1.11版本读取kafka
avro格式数据时遇到了错误,由于我们的avro特殊,因此源码稍微作了修改,以下是改动的代码片段
@Override
public T deserialize(byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
inputStream.skip(5);
Schema readerSchema = getReaderSchema();
GenericDatumReader datumReader = getDatumReader();
datumReader.setSchema(readerSchema);
return datumReader.read(null, decoder);
}
源码包为:org.apache.flink.formats.avro.AvroDeserializationSchema

相同的改动在1.9.0是可以正常工作,我想知道在读取avro格式的数据这块社区是有过什么改动吗

以下是错误信息
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: org.apache.flink.client.program.ProgramInvocationException:
Job failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:116)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1681)
at
com.intsig.flink.streaming.streaming_project.abtest.GenericRecordTest.main(GenericRecordTest.java:54)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: f4d2bd903a55e2d10d67d69eadba618a)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:116)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire

回复: flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 史 正超
Canal可以配置分区策略:配置保证相同id的记录都发到同一个分区,比如 `db.table1:id`
这样就保证了数据的有序。

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用

发件人: Jark Wu<mailto:imj...@gmail.com>
发送时间: 2020年11月5日 21:28
收件人: user-zh<mailto:user-zh@flink.apache.org>
主题: Re: flink1.11的cdc功能对消息顺序性的处理

我理解你说的是对 pk 的更新的场景。

比如一张 user 表,有[user_id, user_name] 2个字段,
假设有 "101, Tim" 记录 做了两次更新
update1:update test set id=102 where id=101;
update2: update test set id=103 wehre id=102;

针对这种场景 debezium 是会把这种针对 pk的更新拆成一条 delete 和一条 insert,而不是 update 消息。

所以 update1 语句产生了:
DELETE(101,Timo) 发到了p1
INSERT(102,Tim) 发到了 p2

update2 语句产生了:
DELETE(102, Tim) 发到了 p2
INSERT(103, Tim) 发到了 p3

所以 flink 去对接这个数据的时候,仍然能够最终数据是 (103, Tim), 因为 102 的两条数据,INSERT, DELETE
仍然是有序的。

所以如果 canal 对于 pk 更新也是同样的策略,那么也是一样的。 但我不确定 canal 是怎么处理 pk 更新的,这个需要调研下。

Best,
Jark

On Thu, 5 Nov 2020 at 21:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>
> 可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q&A 中描述了
> "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink
> 读取的时候才能保证顺序。"
>
>
> 个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
>
>
> [1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q
>
>
> Best,
> Hailong Wang
>
>
>
>
>
> 在 2020-11-05 15:35:55,"18392099563" <18392099...@163.com> 写道:
> >hi everyone,
> >麻烦请教下各位大神,flink如何处理如下问题:
>
> >flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
> >如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
> >假如
> >1.有源表和目标表:
> >create table test(
> >id int(10) primary key
> >)
> >2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
> >3.发往的topic下有三个partition:p0、p1、p2
> >4.源端和目标端都有一条记录id=1
> >
> >此时对源端进行两次update:
> >update1:update test set id=2 where id=1;
> >update2: update test set id=3 wehre id=2;
>
> >假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。
> >
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>



Re: flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 Jark Wu
我理解你说的是对 pk 的更新的场景。

比如一张 user 表,有[user_id, user_name] 2个字段,
假设有 "101, Tim" 记录 做了两次更新
update1:update test set id=102 where id=101;
update2: update test set id=103 wehre id=102;

针对这种场景 debezium 是会把这种针对 pk的更新拆成一条 delete 和一条 insert,而不是 update 消息。

所以 update1 语句产生了:
DELETE(101,Timo) 发到了p1
INSERT(102,Tim) 发到了 p2

update2 语句产生了:
DELETE(102, Tim) 发到了 p2
INSERT(103, Tim) 发到了 p3

所以 flink 去对接这个数据的时候,仍然能够最终数据是 (103, Tim), 因为 102 的两条数据,INSERT, DELETE
仍然是有序的。

所以如果 canal 对于 pk 更新也是同样的策略,那么也是一样的。 但我不确定 canal 是怎么处理 pk 更新的,这个需要调研下。

Best,
Jark

On Thu, 5 Nov 2020 at 21:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>
> 可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q&A 中描述了
> "首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink
> 读取的时候才能保证顺序。"
>
>
> 个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
>
>
> [1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q
>
>
> Best,
> Hailong Wang
>
>
>
>
>
> 在 2020-11-05 15:35:55,"18392099563" <18392099...@163.com> 写道:
> >hi everyone,
> >麻烦请教下各位大神,flink如何处理如下问题:
>
> >flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
> >如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
> >假如
> >1.有源表和目标表:
> >create table test(
> >id int(10) primary key
> >)
> >2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
> >3.发往的topic下有三个partition:p0、p1、p2
> >4.源端和目标端都有一条记录id=1
> >
> >此时对源端进行两次update:
> >update1:update test set id=2 where id=1;
> >update2: update test set id=3 wehre id=2;
>
> >假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。
> >
> >
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink1.11的cdc功能对消息顺序性的处理

2020-11-05 文章 18392099563
hi everyone,
麻烦请教下各位大神,flink如何处理如下问题:
flink1.11引入cdc,可以解析canal和debezuim发送的CDC数据,其中canal一般是可以指定某些字段作为key进行hash分区发送到同一topic下的不同分区的。
如果源端短时间对pk值进行多次update,则有可能导致发往不同分区,从而无法保证顺序性。
假如
1.有源表和目标表:
create table test(
id int(10) primary key
)
2.源表的增量数据通过canal发往kafka,目标表接收kafka消息进行同步。
3.发往的topic下有三个partition:p0、p1、p2
4.源端和目标端都有一条记录id=1

此时对源端进行两次update:
update1:update test set id=2 where id=1;
update2: update test set id=3 wehre id=2;
假如两条消息都在同一批message中发往kafka,其中update1发送到p1,pudate2发送到p2,这两条消息的顺序性是无法保证的,假如update2先到达,则目标端最终结果为id=2,与源端结果id=3不一致。





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:回复: 回复: flink1.11连接mysql问题

2020-11-02 文章 hailongwang
Hi,
   应该是长时间没有传输数据导致 Connection invalid,具体修复 issue 
可见:https://issues.apache.org/jira/browse/FLINK-16681
   看这个 Fix 应该是在1.12 和 即将发布的 1.11.3 上。
  Best,
Hailong Wang

在 2020-11-02 13:20:54,"史 正超"  写道:
>是这样的,因为重试的时候 
>flink-jdbc-connector会把错误的堆栈打印出来的,然后再重连的,对任务没有影响,你的任务失败了吗?我的任务其实也有这个错误,但是任务没有失败,重新连接上mysql了。
>你仔细 看下你的日志里有没有下面的日志:
>JDBC executeBatch error, retry times = 1
>
>发件人: 酷酷的浑蛋 
>发送时间: 2020年11月2日 3:33
>收件人: user-zh@flink.apache.org 
>主题: 回复: flink1.11连接mysql问题
>
>标题上写的就是flink1.11啊
>
>
>
>
>在2020年11月2日 11:33,酷酷的浑蛋 写道:
>你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用
>
>
>
>
>在2020年11月2日 11:30,史 正超 写道:
>你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
> 1 语句保活连接。
>____
>发件人: 酷酷的浑蛋 
>发送时间: 2020年11月2日 2:28
>收件人: user-zh@flink.apache.org 
>主题: Re:回复:flink1.11连接mysql问题
>
>没有解决,隔一段时间就会报这个超时错误
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-10-14 17:33:30,"superainbower"  写道:
>HI
>链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
>| |
>superainbower
>|
>|
>superainbo...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年08月31日 15:57,酷酷的浑蛋 写道:
>关键是在sql中怎么设置,connector=jdbc
>
>
>
>
>在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
>这个问题本质是连接活性问题,
>连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)
>
>
>建议使用连接池druid进行连接活性保持
>
>
>原始邮件
>发件人: 酷酷的浑蛋
>收件人: user-zh
>发送时间: 2020年8月28日(周五) 15:02
>主题: flink1.11连接mysql问题
>
>
>com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
>successfully received from the server was 52,445,041 milliseconds ago. The 
>last packet sent successfully to the server was 52,445,045 milliseconds ago. 
>is longer than the server configured value of'wait_timeout'. You should 
>consider either expiring and/or testing connection validity before use in your 
>application, increasing the server configured values for client timeouts, 
>orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
>problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


回复: 回复: flink1.11连接mysql问题

2020-11-01 文章 史 正超
是这样的,因为重试的时候 
flink-jdbc-connector会把错误的堆栈打印出来的,然后再重连的,对任务没有影响,你的任务失败了吗?我的任务其实也有这个错误,但是任务没有失败,重新连接上mysql了。
你仔细 看下你的日志里有没有下面的日志:
JDBC executeBatch error, retry times = 1

发件人: 酷酷的浑蛋 
发送时间: 2020年11月2日 3:33
收件人: user-zh@flink.apache.org 
主题: 回复: flink1.11连接mysql问题

标题上写的就是flink1.11啊




在2020年11月2日 11:33,酷酷的浑蛋 写道:
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用




在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
 1 语句保活连接。

发件人: 酷酷的浑蛋 
发送时间: 2020年11月2日 2:28
收件人: user-zh@flink.apache.org 
主题: Re:回复:flink1.11连接mysql问题

没有解决,隔一段时间就会报这个超时错误

















在 2020-10-14 17:33:30,"superainbower"  写道:
HI
链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年08月31日 15:57,酷酷的浑蛋 写道:
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


回复: flink1.11连接mysql问题

2020-11-01 文章 酷酷的浑蛋
标题上写的就是flink1.11啊




在2020年11月2日 11:33,酷酷的浑蛋 写道:
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用




在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
 1 语句保活连接。

发件人: 酷酷的浑蛋 
发送时间: 2020年11月2日 2:28
收件人: user-zh@flink.apache.org 
主题: Re:回复:flink1.11连接mysql问题

没有解决,隔一段时间就会报这个超时错误

















在 2020-10-14 17:33:30,"superainbower"  写道:
HI
链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年08月31日 15:57,酷酷的浑蛋 写道:
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


回复: flink1.11连接mysql问题

2020-11-01 文章 酷酷的浑蛋
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用




在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
 1 语句保活连接。

发件人: 酷酷的浑蛋 
发送时间: 2020年11月2日 2:28
收件人: user-zh@flink.apache.org 
主题: Re:回复:flink1.11连接mysql问题

没有解决,隔一段时间就会报这个超时错误

















在 2020-10-14 17:33:30,"superainbower"  写道:
HI
链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年08月31日 15:57,酷酷的浑蛋 写道:
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


回复: Re:回复:flink1.11连接mysql问题

2020-11-01 文章 史 正超
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
 1 语句保活连接。

发件人: 酷酷的浑蛋 
发送时间: 2020年11月2日 2:28
收件人: user-zh@flink.apache.org 
主题: Re:回复:flink1.11连接mysql问题

没有解决,隔一段时间就会报这个超时错误

















在 2020-10-14 17:33:30,"superainbower"  写道:
>HI
>链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
>| |
>superainbower
>|
>|
>superainbo...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年08月31日 15:57,酷酷的浑蛋 写道:
>关键是在sql中怎么设置,connector=jdbc
>
>
>
>
>在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
>这个问题本质是连接活性问题,
>连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)
>
>
>建议使用连接池druid进行连接活性保持
>
>
>原始邮件
>发件人: 酷酷的浑蛋
>收件人: user-zh
>发送时间: 2020年8月28日(周五) 15:02
>主题: flink1.11连接mysql问题
>
>
>com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
>successfully received from the server was 52,445,041 milliseconds ago. The 
>last packet sent successfully to the server was 52,445,045 milliseconds ago. 
>is longer than the server configured value of'wait_timeout'. You should 
>consider either expiring and/or testing connection validity before use in your 
>application, increasing the server configured values for client timeouts, 
>orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
>problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


Re:回复:flink1.11连接mysql问题

2020-11-01 文章 酷酷的浑蛋
没有解决,隔一段时间就会报这个超时错误

















在 2020-10-14 17:33:30,"superainbower"  写道:
>HI
>链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
>| |
>superainbower
>|
>|
>superainbo...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年08月31日 15:57,酷酷的浑蛋 写道:
>关键是在sql中怎么设置,connector=jdbc
>
>
>
>
>在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
>这个问题本质是连接活性问题,
>连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)
>
>
>建议使用连接池druid进行连接活性保持
>
>
>原始邮件
>发件人: 酷酷的浑蛋
>收件人: user-zh
>发送时间: 2020年8月28日(周五) 15:02
>主题: flink1.11连接mysql问题
>
>
>com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
>successfully received from the server was 52,445,041 milliseconds ago. The 
>last packet sent successfully to the server was 52,445,045 milliseconds ago. 
>is longer than the server configured value of'wait_timeout'. You should 
>consider either expiring and/or testing connection validity before use in your 
>application, increasing the server configured values for client timeouts, 
>orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
>problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用


Re: flink1.11 elasticsearch connector

2020-11-01 文章 zhisheng
hi,

可以自己根据社区的代码进行重编译,改成自己公司的依赖名,推送自公司的 nexus。

Best
zhisheng

Yangze Guo  于2020年10月29日周四 下午4:00写道:

> 1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-18361
>
> Best,
> Yangze Guo
>
> On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
> >
> > elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql
> api如何加入账号认证?
>


Re: flink1.11 kafka connector

2020-11-01 文章 zhisheng
hi,

应该是可以继承 FlinkKafkaPartitioner 接口,自己重写 partition 方法实现 hash(key) 的功能

eg:

public class MyCustomPartitioner extends FlinkKafkaPartitioner> {

@Override
public int partition(Map map, byte[] key, byte[] value,
String targetTopic, int[] partitions) {
String key = map.get(xxx).toString();
return partitions[Math.abs(key.hashCode() % partitions.length)];
}
}

Best!
zhisheng

Jark Wu  于2020年10月29日周四 下午2:33写道:

> 多谢创建 issue。
>
> side comment: 1.12 中 kafka connector 将支持声明 message key 部分,当声明了 message key
> 部分,就自动会按照 key 来做 hash 到某个固定分区。
>
> Best,
> Jark
>
> On Thu, 29 Oct 2020 at 14:27, Dream-底限  wrote:
>
> > hi、
> > 好的,https://issues.apache.org/jira/browse/FLINK-19871
> >
> > Jark Wu  于2020年10月29日周四 下午12:06写道:
> >
> > > 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
> > >
> > > > hi、
> > > > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > > > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> > > >
> > > >- fixed:每个Flink分区最多只能有一个Kafka分区。
> > > >- round-robin:Flink分区循环分配给Kafka分区。
> > > >
> > >
> >
>


Re: flink1.11 elasticsearch connector

2020-10-29 文章 Yangze Guo
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 29, 2020 at 3:37 PM 赵帅  wrote:
>
> elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?


flink1.11 elasticsearch connector

2020-10-29 文章 赵帅
elasticsearch7.6有账号认证,目前flink1.11 elasticsearch connector sql api如何加入账号认证?

Re: flink1.11 kafka connector

2020-10-28 文章 Jark Wu
多谢创建 issue。

side comment: 1.12 中 kafka connector 将支持声明 message key 部分,当声明了 message key
部分,就自动会按照 key 来做 hash 到某个固定分区。

Best,
Jark

On Thu, 29 Oct 2020 at 14:27, Dream-底限  wrote:

> hi、
> 好的,https://issues.apache.org/jira/browse/FLINK-19871
>
> Jark Wu  于2020年10月29日周四 下午12:06写道:
>
> > 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
> >
> > > hi、
> > > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> > >
> > >- fixed:每个Flink分区最多只能有一个Kafka分区。
> > >- round-robin:Flink分区循环分配给Kafka分区。
> > >
> >
>


Re: flink1.11 kafka connector

2020-10-28 文章 Dream-底限
hi、
好的,https://issues.apache.org/jira/browse/FLINK-19871

Jark Wu  于2020年10月29日周四 下午12:06写道:

> 目前还不支持,可以去社区开个 issue,看能不能赶上1.12
>
> Best,
> Jark
>
>
> On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:
>
> > hi、
> > 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> > partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
> >
> >- fixed:每个Flink分区最多只能有一个Kafka分区。
> >- round-robin:Flink分区循环分配给Kafka分区。
> >
>


Re: flink1.11 kafka connector

2020-10-28 文章 Jark Wu
目前还不支持,可以去社区开个 issue,看能不能赶上1.12

Best,
Jark


On Thu, 29 Oct 2020 at 11:26, Dream-底限  wrote:

> hi、
> 我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
> partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗
>
>- fixed:每个Flink分区最多只能有一个Kafka分区。
>- round-robin:Flink分区循环分配给Kafka分区。
>


flink1.11 kafka connector

2020-10-28 文章 Dream-底限
hi、
我看了一下官方提供的kafka sink,对于数据发送方式为两种:对于第二种情况,有办法保证对于指定主键的变化过程发送到同一个kafka
partiiton吗,或者说社区对于原生hash(key)到kafka分区映射的方式有支持计划吗

   - fixed:每个Flink分区最多只能有一个Kafka分区。
   - round-robin:Flink分区循环分配给Kafka分区。


Re: flink1.11日志上报

2020-10-28 文章 m13162790856
我们这边也是这样搜集日志上报  es 保留最近一个月的数据不回保留全部数据


在 2020年10月27日 20:48,zhisheng 写道:


弱弱的问一下,你们集群作业数量大概多少?因为用户可能打印原始数据在日志里面,这个数据量确实还是很大的,全部将日志打到 ES 每月需要多少成本啊? 
Storm☀️  于2020年10月27日周二 下午8:37写道: > 
我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索 > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

Re: flink1.11日志上报

2020-10-27 文章 zhisheng
弱弱的问一下,你们集群作业数量大概多少?因为用户可能打印原始数据在日志里面,这个数据量确实还是很大的,全部将日志打到 ES 每月需要多少成本啊?

Storm☀️  于2020年10月27日周二 下午8:37写道:

> 我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11日志上报

2020-10-27 文章 Storm☀️
我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined in streaming topology. Cannot generate StreamGraph.的问题

2020-10-26 文章 Shuai Xia
1.11之前TableEnvironmentImpl与StreamExecutionEnvironment的execute方法实现一致
无论用哪一个都可以
1.11修改了TableEnvironmentImpl中execute的实现逻辑
如果代码中涉及了DataStream的操作,则需要使用StreamExecutionEnvironment的execute方法

简单概述为:
 StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业
 Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业
 新引入的 TableEnvironment.executeSql() 方法是直接执行sql作业 (异步提交作业),不需要再调用 
StreamTableEnvironment.execute() 或 StreamExecutionEnvironment.execute()


--
发件人:me 
发送时间:2020年10月26日(星期一) 09:13
收件人:user-zh 
主 题:关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators 
defined in streaming topology. Cannot generate StreamGraph.的问题

关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined 
in streaming topology. Cannot generate StreamGraph.的问题


-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。
-|程序内部有使用flinksql 读写kafka,从而执行 sqlUpdate  






尝试使用新版api 只使用tableEnv.executeSql 从而不加 dataStreamEnv.execute 和tableEnv.execute 
但是会出现程序执行一个开头就结束了而且没有异常。


求问新老API兼容吗?Table和Datastream同时存在的时候使用dataStreamEnv.execute 
还是tableEnv.execute?

Re: 请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-26 文章 赵一旦
是的。不过不是bug。flink将webui显示多少history ckpt以及实际保留多少ckpt当作2个配置。
并且,这2配置的大小无关系。
如果webui(20),retain(10)。那么实际webui的后10个ckpt路径是不存在的,不会保留。
反之,webui(10),retain(20)。虽然部分ckpt在ui上不显示,但实际会保留。

2个参数去flink文档都能找到的哈。

cxydeve...@163.com  于2020年10月26日周一 上午11:34写道:

> 知道问题所在了,那个在配置文件中设置state.checkpoints.num-retained是生效的
> 在webui,任务checkpoint的history中总是显示10条最新的记录(我以为就是一定是保留了最新的10份数据),
> 但是其实真正持久化有数据的数目是根据state.checkpoints.num-retained的值
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-25 文章 cxydeve...@163.com
知道问题所在了,那个在配置文件中设置state.checkpoints.num-retained是生效的
在webui,任务checkpoint的history中总是显示10条最新的记录(我以为就是一定是保留了最新的10份数据),
但是其实真正持久化有数据的数目是根据state.checkpoints.num-retained的值



--
Sent from: http://apache-flink.147419.n8.nabble.com/

关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined in streaming topology. Cannot generate StreamGraph.的问题

2020-10-25 文章 李世钰
关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined 
in streaming topology. Cannot generate StreamGraph.的问题




-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。

-|程序内部有使用flinksql 读写kafka,从而执行 sqlUpdate  










尝试使用新版api 只使用tableEnv.executeSql 从而不加 dataStreamEnv.execute 
和tableEnv.execute 但是会出现程序执行一个开头就结束了而且没有异常。




求问新老API兼容吗?Table和Datastream同时存在的时候使用dataStreamEnv.execute 
还是tableEnv.execute?

关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined in streaming topology. Cannot generate StreamGraph.的问题

2020-10-25 文章 me
关于flink1.11 新版本使用dataStreamEnv.execute 和tableEnv.execute出现No operators defined 
in streaming topology. Cannot generate StreamGraph.的问题


-| 程序内部使用Table API同时也有 Table转为 Datastream的场景。
-|程序内部有使用flinksql 读写kafka,从而执行 sqlUpdate  






尝试使用新版api 只使用tableEnv.executeSql 从而不加 dataStreamEnv.execute 和tableEnv.execute 
但是会出现程序执行一个开头就结束了而且没有异常。


求问新老API兼容吗?Table和Datastream同时存在的时候使用dataStreamEnv.execute 
还是tableEnv.execute?

请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-22 文章 chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 
但是我发现好像是10 , 
同时我也设置了其他的属性,比如
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
是可行,所以我的设置应该没有什么问题


[1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained



Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说,

我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf 
jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~

best,
amenhub



 
发件人: zhisheng
发送时间: 2020-10-22 23:28
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
hi
 
flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持
 
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs
 
Best
zhisheng
 
Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:
 
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 zhisheng
hi

flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持

 [1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs

Best
zhisheng

Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:

>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11加载外部jar包进行UDF注册

2020-10-21 文章 Husky Zeng
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html


https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927



我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 文章 chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar";;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
那这种设置env的方式有可能还会造成其他什么问题?

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-10-15 19:22
收件人: user-zh
主题: Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复!
 
对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
 
期待您的回复,谢谢~
 
best, 
amenhub
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar";;
loadJar(new URL(path));
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
非常感谢您的回复!

对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。

期待您的回复,谢谢~

best, 
amenhub
 
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar";;
loadJar(new URL(path));
 
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
 
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
 
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 cxydeve...@163.com
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar";;
loadJar(new URL(path));

Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);

Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);

tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}

//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.11连接mysql问题

2020-10-14 文章 superainbower
HI
链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年08月31日 15:57,酷酷的浑蛋 写道:
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用

flink1.11加载外部jar包进行UDF注册

2020-10-13 文章 amen...@163.com
hi, everyone

近期有做一个关于从外部路径加载UDF的开发,但报了如下异常:(截取主要的异常信息)

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.xxx.xxx.udf.Uppercase
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-1318a525-1b5a-4c07-808e-f62083c3fb11/job_a5501605ff554915a81ae12e3018e77d/blob_p-b0411adc6fb3d602ed03076ddc3d1bf3e6a63319-48d1e8f3c1b25d4e2b78242429834e31'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassNotFoundException: com.xxx.xxx.udf.Uppercase
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_171]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_171]

我这边猜测的原因是进行外部jar包加载进行createTemporarySystemFunction的时候,在flink运行环境中没有将外部jar加载进来,但对这个猜测也还是很迷惑,菜鸟操作,希望有大佬帮忙提供解决方案或思路,谢谢~

best,
amenhub



Re: flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-10-09 文章 Rui Li
Hi,

实时写hive有一个已知的性能问题:https://issues.apache.org/jira/browse/FLINK-19121
建议打一个这个patch再试试。

On Tue, Sep 29, 2020 at 7:12 PM Jun Zhang <825875...@qq.com> wrote:

> 你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。
>
>
>
> Best  Jun
>
>
> -- 原始邮件 --
> 发件人: me  发送时间: 2020年9月29日 19:08
> 收件人: user-zh  主题: 回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
>
>
>
> flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
> tableEnv.executeSql("insert into dwd_security_log select * from " + table)
>
> 实际写入hive之后,查看hdfs上写入的文件为19M,这是60秒内写入hive的,flink流式写入hive通过checkpotin来把数据刷入hive中。
>
>
> 请问大家只有有什么提升写入速度的参数或者方式吗?



-- 
Best regards!
Rui Li


Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 me
flink1.11大家有没有遇到写入hive速度慢的问题,加到并行度之后,写入速度1000条/秒,写入性能还是很差劲,完全不满足需要,要怎么把实时的数据写入hive中?
flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有1000条/秒,
Datastream 是直接读取的kafka数据,速度现在是1条每秒,现在只能写入1000条/每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年10月9日(周五) 17:33
主题: Re:Re: flink1.11流式写入hive速度慢的问题


您的回复感觉一点关系也没有,有点打广告的嫌疑 原始邮件 发件人: Michael Ran 收件人: 
user-zh 发送时间: 2020年10月9日(周五) 17:14 主题: Re:Re: 
flink1.11流式写入hive速度慢的问题 不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 
速度本来就快不起来,每次都要生成文件。 如果先写文件,文件写好了再进行一次load 就会快很多 在 2020-10-09 15:55:15,"Jingsong 
Li"  写道: >Hi, >是Hive表吧? 
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的 > 
>可以下载最新的1.11分支的Hive依赖来试下: 
>https://repository.apache.org/snapshots/org/apache/flink/ 
>(比如你用hive-1.2.2依赖,你可以下载 
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
 >) > >Best, >Jingsong > >On Fri, Oct 9, 2020 at 3:50 PM me  
wrote: > >> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table >> >> >> 
原始邮件 >> 发件人: me >> 收件人: user-zh >> 
发送时间: 2020年10月9日(周五) 15:34 >> 主题: flink1.11流式写入hive速度慢的问题 >> >> >> flink1.11 
将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒 >> val chaitin_test = 
tableEnv.fromDataStream(dataStream,'test) >> chaitin_test.printSchema() >> 
tableEnv.executeSql("insert into chaitin_test select test from " + >> 
chaitin_test) > > > >-- >Best, Jingsong Lee

Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 me
您的回复感觉一点关系也没有,有点打广告的嫌疑


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年10月9日(周五) 17:14
主题: Re:Re: flink1.11流式写入hive速度慢的问题


不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 速度本来就快不起来,每次都要生成文件。 
如果先写文件,文件写好了再进行一次load 就会快很多 在 2020-10-09 15:55:15,"Jingsong Li" 
 写道: >Hi, >是Hive表吧? 
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的 > 
>可以下载最新的1.11分支的Hive依赖来试下: 
>https://repository.apache.org/snapshots/org/apache/flink/ 
>(比如你用hive-1.2.2依赖,你可以下载 
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
 >) > >Best, >Jingsong > >On Fri, Oct 9, 2020 at 3:50 PM me  
wrote: > >> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table >> >> >> 
原始邮件 >> 发件人: me >> 收件人: user-zh >> 
发送时间: 2020年10月9日(周五) 15:34 >> 主题: flink1.11流式写入hive速度慢的问题 >> >> >> flink1.11 
将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒 >> val chaitin_test = 
tableEnv.fromDataStream(dataStream,'test) >> chaitin_test.printSchema() >> 
tableEnv.executeSql("insert into chaitin_test select test from " + >> 
chaitin_test) > > > >-- >Best, Jingsong Lee

Re:Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 Michael Ran
不知道现在flink 能否直接获取hive 文件写入。以前直接用jdbc 写hive 速度本来就快不起来,每次都要生成文件。  
如果先写文件,文件写好了再进行一次load  就会快很多
在 2020-10-09 15:55:15,"Jingsong Li"  写道:
>Hi,
>是Hive表吧?
>https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的
>
>可以下载最新的1.11分支的Hive依赖来试下:
>https://repository.apache.org/snapshots/org/apache/flink/
>(比如你用hive-1.2.2依赖,你可以下载
>https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
>)
>
>Best,
>Jingsong
>
>On Fri, Oct 9, 2020 at 3:50 PM me  wrote:
>
>> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>>
>>
>>  原始邮件
>> 发件人: me
>> 收件人: user-zh
>> 发送时间: 2020年10月9日(周五) 15:34
>> 主题: flink1.11流式写入hive速度慢的问题
>>
>>
>> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
>> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
>> chaitin_test.printSchema()
>> tableEnv.executeSql("insert into chaitin_test select test from " +
>> chaitin_test)
>
>
>
>-- 
>Best, Jingsong Lee


Re: flink1.11流式写入hive速度慢的问题

2020-10-09 文章 Jingsong Li
Hi,
是Hive表吧?
https://issues.apache.org/jira/browse/FLINK-19121 在1.11.3中才fix,这个是影响性能的

可以下载最新的1.11分支的Hive依赖来试下:
https://repository.apache.org/snapshots/org/apache/flink/
(比如你用hive-1.2.2依赖,你可以下载
https://repository.apache.org/content/groups/snapshots/org/apache/flink/flink-sql-connector-hive-1.2.2_2.11/1.11-SNAPSHOT/flink-sql-connector-hive-1.2.2_2.11-1.11-20201008.202441-190.jar
)

Best,
Jingsong

On Fri, Oct 9, 2020 at 3:50 PM me  wrote:

> dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table
>
>
>  原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年10月9日(周五) 15:34
> 主题: flink1.11流式写入hive速度慢的问题
>
>
> flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
> val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
> chaitin_test.printSchema()
> tableEnv.executeSql("insert into chaitin_test select test from " +
> chaitin_test)



-- 
Best, Jingsong Lee


转发:flink1.11流式写入hive速度慢的问题

2020-10-09 文章 me
dataStream读取速度在5000条每秒,没有其他的处理逻辑,仅仅是将datastream 转换为table


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年10月9日(周五) 15:34
主题: flink1.11流式写入hive速度慢的问题


flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)

flink1.11流式写入hive速度慢的问题

2020-10-08 文章 me
flink1.11 将datastream转为table之后,再流式的写入hive表中,发现写入速度只有几十条每秒
val chaitin_test = tableEnv.fromDataStream(dataStream,'test)
chaitin_test.printSchema()
tableEnv.executeSql("insert into chaitin_test select test from " + chaitin_test)

回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 Jun Zhang
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。



Best  Jun


-- 原始邮件 --
发件人: me 

flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 me
flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
tableEnv.executeSql("insert into dwd_security_log select * from " + table)
实际写入hive之后,查看hdfs上写入的文件为19M,这是60秒内写入hive的,flink流式写入hive通过checkpotin来把数据刷入hive中。


请问大家只有有什么提升写入速度的参数或者方式吗?

flink1.11提供的flink-webui中的日志界面的查看问题

2020-09-27 文章 赵一旦
今天发现jobmanager的日志可看,taskmanager日志不可以看(这个无所谓,因为我改了日志路径,所以看不到可能),但是有个loglist中列出了文件,这个文件点了却也看不到。


既然都给我列出了所有log文件了,为什么点了看不到呢?

而且看jobmanager的后台报错的化,报错是 tmp
目录的blob_xxx吧啦的文件不存在。不清楚这个看日志过程是怎样的,为什么需要拿tmp目录的blob_xxx,而不是直接从log目录下去拿文件呢?


回复: flink1.11连接mysql问题

2020-08-31 文章 酷酷的浑蛋
你是说让我修改mysql配置? 怎么可能允许我修改啊




在2020年09月1日 10:12,amen...@163.com 写道:
如果是mysql5.x以上的版本,url中autoReconnect参数会无效吧,

可以尝试下修改配置文件wait_timeout/interactive_out参数

best,
amenhub

发件人: 酷酷的浑蛋
发送时间: 2020-08-31 20:48
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题


下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
`xx` varchar,
`xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true&failOverReadOnly=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:


在 2020年8月28日,15:02,酷酷的浑蛋  写道:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.




flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用



Hi

超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗

Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 
<https://issues.apache.org/jira/browse/FLINK-16681>



Re: 回复: flink1.11连接mysql问题

2020-08-31 文章 amen...@163.com
如果是mysql5.x以上的版本,url中autoReconnect参数会无效吧,

可以尝试下修改配置文件wait_timeout/interactive_out参数

best,
amenhub
 
发件人: 酷酷的浑蛋
发送时间: 2020-08-31 20:48
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题
 
 
下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
  `xx` varchar,
  `xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true&failOverReadOnly=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:
 
 
在 2020年8月28日,15:02,酷酷的浑蛋  写道:
 
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.
 
 
 
 
flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用
 
 
 
Hi
 
超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗
 
Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 
<https://issues.apache.org/jira/browse/FLINK-16681>
 


回复: flink1.11连接mysql问题

2020-08-31 文章 酷酷的浑蛋


下面是我连接mysql的配置,用的flink-1.11.1,还是报那个错误
CREATE TABLE xx(
  `xx` varchar,
  `xx` varchar
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx/xx?autoReconnect=true&failOverReadOnly=false',
'table-name' = ‘xx',
'driver' = 'com.mysql.jdbc.Driver',
'username' = ‘xx',
'password' = ‘xx',
'scan.partition.column' = 'id',
'scan.partition.num' = '50',
'scan.partition.lower-bound' = '500',
'scan.partition.upper-bound' = '1000',
'scan.fetch-size' = '100',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10s'
);
在2020年08月31日 17:33,Leonard Xu 写道:


在 2020年8月28日,15:02,酷酷的浑蛋  写道:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem.




flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用



Hi

超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗

Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 




Re: flink1.11连接mysql问题

2020-08-31 文章 Leonard Xu


> 在 2020年8月28日,15:02,酷酷的浑蛋  写道:
> 
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
> successfully received from the server was 52,445,041 milliseconds ago. The 
> last packet sent successfully to the server was 52,445,045 milliseconds ago. 
> is longer than the server configured value of'wait_timeout'. You should 
> consider either expiring and/or testing connection validity before use in 
> your application, increasing the server configured values for client 
> timeouts, orusing the Connector/J connection property 'autoReconnect=true' to 
> avoid this problem.
> 
> 
> 
> 
> flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用
> 


Hi

超时断开问题在1.11应该已经修复[1],你是怎么使用的?可以提供更多的信息吗

Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-16681 




回复:flink1.11连接mysql问题

2020-08-31 文章 酷酷的浑蛋
关键是在sql中怎么设置,connector=jdbc




在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用

回复:flink1.11连接mysql问题

2020-08-31 文章 13580506953
这个问题本质是连接活性问题, 
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)


建议使用连接池druid进行连接活性保持


 原始邮件 
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接mysql问题


com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet 
successfully received from the server was 52,445,041 milliseconds ago. The last 
packet sent successfully to the server was 52,445,045 milliseconds ago. is 
longer than the server configured value of'wait_timeout'. You should consider 
either expiring and/or testing connection validity before use in your 
application, increasing the server configured values for client timeouts, 
orusing the Connector/J connection property 'autoReconnect=true' to avoid this 
problem. flink连接mysql,在过了一定时间后,就断开连接了,设置'autoReconnect=true’也不管用

回复: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 sllence
Hi Zou Dan:

可以尝试下立刻语句是否可行
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

CREATE TABLE Orders (
user BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH ( 
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
-- Add watermark definition
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
-- Overwrite the startup-mode
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

-邮件原件-
发件人: me  
发送时间: 2020年8月30日 22:16
收件人: user-zh 
抄送: zoudanx 
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time? 

如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
您那有什么可解决的想法吗?


 原始邮件 
发件人: Zou Dan
收件人: user-zh
发送时间: 2020年8月30日(周日) 21:55
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?


Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
 
<">https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
 Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11 
可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义 
事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。


Re: flink1.11连接mysql问题

2020-08-30 文章 Danny Chan
这个问题已经有 issue 在追踪了 [1]

[1] https://issues.apache.org/jira/browse/FLINK-12494

Best,
Danny Chan
在 2020年8月28日 +0800 PM3:02,user-zh@flink.apache.org,写道:
>
> CommunicationsException


Re: flink1.11时间函数

2020-08-30 文章 Danny Chan
对应英文的 deterministic function 可以更好理解些 ~

Best,
Danny Chan
在 2020年8月29日 +0800 PM6:23,Dream-底限 ,写道:
> 哦哦,好吧,我昨天用NOW的时候直接报错告诉我这是个bug,让我提交issue,我以为这种标示的都是函数功能有问题的
>
> Benchao Li  于2020年8月28日周五 下午8:01写道:
>
> > 不确定的意思是,这个函数的返回值是动态的,每次调用返回可能不同。
> > 对应的是确定性函数,比如concat就是确定性函数,只要输入是一样的,它的返回值就永远都是一样的。
> > 这个函数是否是确定性的,会影响plan的过程,比如是否可以做express reduce,是否可以复用表达式结果等。
> >
> > Dream-底限  于2020年8月28日周五 下午2:50写道:
> >
> > > hi
> > >
> > > UNIX_TIMESTAMP()
> > >
> > > NOW()
> > >
> > > 我这面想使用flink的时间戳函数,但是看官方文档对这两个函数描述后面加了一个此功能不确定,这个此功能不确定指的是这两个时间函数不能用吗
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 Rui Li
Hi,

这个场景目前还是不支持的。定义watermark需要在DDL里做,hive表本身没有这个概念,所以DDL里定义不了。以后也许可以通过额外的参数来指定watermark。

On Sun, Aug 30, 2020 at 10:16 PM me  wrote:

> 如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
> 您那有什么可解决的想法吗?
>
>
>  原始邮件
> 发件人: Zou Dan
> 收件人: user-zh
> 发送时间: 2020年8月30日(周日) 21:55
> 主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?
>
>
> Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
> <">
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
> Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11
> 可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义
> 事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。



-- 
Best regards!
Rui Li


Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?

2020-08-30 文章 me
如果是直接连接的hive catalog呢,是hive中已存在的表,直接去流式的连接读取?
您那有什么可解决的想法吗?


 原始邮件 
发件人: Zou Dan
收件人: user-zh
发送时间: 2020年8月30日(周日) 21:55
主题: Re: flink1.11 流式读取hive怎么设置 process_time 和event_time?


Event time 是通过 DDL 中 watermark 语句设置的,具体可以参考文档 [1] [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table
 
<">https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table>
 Best, Dan Zou > 2020年8月30日 下午9:42,me  写道: > > flink1.11 
可以使用在使用select语句时,显式的指定是流式读取,流式的读出出来之后如果想使用实时计算中的特性窗口函数然后指定时间语义 
事件时间和处理时间,但是flink sql需要显示的定义数据中的时间字段才能识别为 event_time,求问这个怎么去设置。

  1   2   3   >