flink 1.12 中如何读取 mysql datetime 字段
在 mysql 中创建表 CREATE TABLE `p_port_packet_loss_5m` ( `id` binary(16) NOT NULL, `coltime` datetime NOT NULL, ... 在flink 中创建表 create table if not exists p_port_packet_loss_5m ( id bytes, coltime timestamp, ...) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/mydatabase', 在flink sql 客户端执行 select * from p_port_packet_loss_5m; 总是报错 java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp 改了若干种数据类型都不行,这种情况该如何处理呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink编译报错
mvn clean install -T 4C -Pfast -DskipTests -Dcheckstyle.skip=true -DnpmRegistryURL=https://registry.npm.taobao.org 可以用这个试试 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 是否可以 hive 流 join hive 流?
还没有,你可以关注下这个issue[1] 祝好, Leonard [1] https://issues.apache.org/jira/browse/FLINK-21183 > 在 2021年2月1日,13:29,macdoor 写道: > > 当前的 1.13-snapshot 支持了吗?我可以试试吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于配置关联初始化方案的实现问题
官网上没有,在github上https://github.com/ververica/flink-cdc-connectors ``` SourceFunction sourceFunction = MySQLSource.builder() .hostname("localhost") .port(3306) .databaseList("inventory") // monitor all tables under inventory database .username("flinkuser") .password("flinkpw") .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .build(); ``` 在 2021年2月1日 14:07,赵一旦 写道: 我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。 javenjiangfsof 于2021年2月1日周一 下午1:40写道: > DataStream API,像下面这样 > ``` > val list = ... //i use jdbc to get the init data > val dimensionInitStream = env.fromCollection(list) > val dimension = > dimensionStream.union(dimensionInitStream).broadcast(descriptor) > mainStream.connect(dimensionStream) > ... > ``` > 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置 > > 在 2021年2月1日 13:30,赵一旦 写道: > > > FlinkSQL ? javenjiangfsof 于2021年2月1日周一 > 上午11:40写道: > Hi 社区的各位 > > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > > + > > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > > 1.初始化通过jdbc获取,通过fromCollection处理后,union > > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Re: 关于配置关联初始化方案的实现问题
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。 javenjiangfsof 于2021年2月1日周一 下午1:40写道: > DataStream API,像下面这样 > ``` > val list = ... //i use jdbc to get the init data > val dimensionInitStream = env.fromCollection(list) > val dimension = > dimensionStream.union(dimensionInitStream).broadcast(descriptor) > mainStream.connect(dimensionStream) > ... > ``` > 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置 > > 在 2021年2月1日 13:30,赵一旦 写道: > > > FlinkSQL ? javenjiangfsof 于2021年2月1日周一 > 上午11:40写道: > Hi 社区的各位 > > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > > + > > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > > 1.初始化通过jdbc获取,通过fromCollection处理后,union > > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Re: 是否可以 hive 流 join hive 流?
当前的 1.13-snapshot 支持了吗?我可以试试吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于配置关联初始化方案的实现问题
DataStream API,像下面这样 ``` val list = ... //i use jdbc to get the init data val dimensionInitStream = env.fromCollection(list) val dimension = dimensionStream.union(dimensionInitStream).broadcast(descriptor) mainStream.connect(dimensionStream) ... ``` 注:我怕没描述清楚问题:问题在于主流和广播流关联的时候,广播流不一定会先于主流到,导致关联不到配置 在 2021年2月1日 13:30,赵一旦 写道: FlinkSQL ? javenjiangfsof 于2021年2月1日周一 上午11:40写道: > Hi 社区的各位 > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > + > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > 1.初始化通过jdbc获取,通过fromCollection处理后,union > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Re: 关于配置关联初始化方案的实现问题
FlinkSQL ? javenjiangfsof 于2021年2月1日周一 上午11:40写道: > Hi 社区的各位 > > 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc > + > broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: > 1.初始化通过jdbc获取,通过fromCollection处理后,union > cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… > > 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) > 3.更好的方案??? > > > 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ > liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? > 希望能看到各位的回复,感谢
Re: 是否可以 hive 流 join hive 流?
Okay, 和我理解的一样,这个时间上是 event time, 基于event time的 interval join 需要定义watermark,目前hive表还不支持定义watermark,1.13应该会支持。 > 在 2021年2月1日,10:58,macdoor 写道: > > p1.time 是数据记录里的时间,也用这个时间做分区 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 水印的作用请教
很详尽了,非常感谢 @tison ! 发件人: tison 发送时间: 2021-02-01 11:43 收件人: user-zh 主题: Re: Re: 水印的作用请教 对于 StreamingFileSink 可以查看这两份资料 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time 默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的 Context 里,Flink 没有开箱即用的基于 EventTime 的分桶策略,你需要自己尝试实现。比水印晚的数据,可以自行实现为丢弃或追加到原有分区文件上。 对于 SQL 可以查看这份资料 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html 目前看起来开箱的逻辑迟到数据会追加而不是丢弃。有一些相关的配置可以调整 commit 也就是落盘的时机,但不影响落盘的数据。 Best, tison. amenhub 于2021年2月1日周一 上午11:07写道: > StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了 > > > > > 发件人: tison > 发送时间: 2021-02-01 11:01 > 收件人: user-zh > 主题: Re: Re: 水印的作用请教 > 请问你使用哪种 SinkConnector 写入 HDFS 呢? > > Best, > tison. > > > amenhub 于2021年2月1日周一 上午10:58写道: > > > >>> > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理? > > > > > > > > > > 发件人: amenhub > > 发送时间: 2021-02-01 10:44 > > 收件人: user-zh > > 主题: Re: Re: 水印的作用请教 > > 谢谢回复! > > > > 也就是说如果我利用Flink从Kafka (Select > > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? > > > > best, > > amenhub > > > > > > > > 发件人: tison > > 发送时间: 2021-02-01 10:36 > > 收件人: user-zh > > 主题: Re: 水印的作用请教 > > 取决于你的计算流图,watermark 通常只在以下情况有实际作用 > > True > > & cond 1. 使用 EventTime > > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer > > Best, > > tison. > > amenhub 于2021年2月1日周一 上午10:26写道: > > > hi everyone, > > > > > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > > > > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > > > 那么, > > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > > > > > > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > > > > > best, > > > amenhub > > > > > > > > > > > > > > >
Re: [ANNOUNCE] Apache Flink 1.10.3 released
Thanks Xintong for being the release manager and everyone who helped with the release! Cheers, Zhu Dian Fu 于2021年1月29日周五 下午5:56写道: > Thanks Xintong for driving this release! > > Regards, > Dian > > 在 2021年1月29日,下午5:24,Till Rohrmann 写道: > > Thanks Xintong for being our release manager. Well done! > > Cheers, > Till > > On Fri, Jan 29, 2021 at 9:50 AM Yang Wang wrote: > >> Thanks Xintong for driving this release. >> >> Best, >> Yang >> >> Yu Li 于2021年1月29日周五 下午3:52写道: >> >>> Thanks Xintong for being our release manager and everyone else who made >>> the release possible! >>> >>> Best Regards, >>> Yu >>> >>> >>> On Fri, 29 Jan 2021 at 15:05, Xintong Song wrote: >>> The Apache Flink community is very happy to announce the release of Apache Flink 1.10.3, which is the third bugfix release for the Apache Flink 1.10 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2021/01/29/release-1.10.3.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, Xintong Song >>> >
Re: Re: 水印的作用请教
对于 StreamingFileSink 可以查看这两份资料 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html https://stackoverflow.com/questions/54763160/is-it-possible-that-bucketing-sink-create-bucket-on-event-time 默认的,watermark 和 (EventTime) Timestamp 信息会带到 BucketAssigner 实现所需的 Context 里,Flink 没有开箱即用的基于 EventTime 的分桶策略,你需要自己尝试实现。比水印晚的数据,可以自行实现为丢弃或追加到原有分区文件上。 对于 SQL 可以查看这份资料 https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html 目前看起来开箱的逻辑迟到数据会追加而不是丢弃。有一些相关的配置可以调整 commit 也就是落盘的时机,但不影响落盘的数据。 Best, tison. amenhub 于2021年2月1日周一 上午11:07写道: > StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了 > > > > > 发件人: tison > 发送时间: 2021-02-01 11:01 > 收件人: user-zh > 主题: Re: Re: 水印的作用请教 > 请问你使用哪种 SinkConnector 写入 HDFS 呢? > > Best, > tison. > > > amenhub 于2021年2月1日周一 上午10:58写道: > > > >>> > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理? > > > > > > > > > > 发件人: amenhub > > 发送时间: 2021-02-01 10:44 > > 收件人: user-zh > > 主题: Re: Re: 水印的作用请教 > > 谢谢回复! > > > > 也就是说如果我利用Flink从Kafka (Select > > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? > > > > best, > > amenhub > > > > > > > > 发件人: tison > > 发送时间: 2021-02-01 10:36 > > 收件人: user-zh > > 主题: Re: 水印的作用请教 > > 取决于你的计算流图,watermark 通常只在以下情况有实际作用 > > True > > & cond 1. 使用 EventTime > > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer > > Best, > > tison. > > amenhub 于2021年2月1日周一 上午10:26写道: > > > hi everyone, > > > > > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > > > > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > > > 那么, > > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > > > > > > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > > > > > best, > > > amenhub > > > > > > > > > > > > > > >
关于配置关联初始化方案的实现问题
Hi 社区的各位 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc + broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案: 1.初始化通过jdbc获取,通过fromCollection处理后,union cdc过来的配置流,这个经过测试确实可以解决,但是会有疑惑:通过fromCollection处理的内存的数据一定会比kafka,cdc过来的数据快吗?为什么… 2.通过ListState(有看到keyed流的解决方案,但是这里我并不需要keyby,keyby之后的数据很不均匀,还会产生很多partition,还会有shuffle损耗等问题),但是这会涉及到何时触发缓存中数据的触发关联,以及清理状态的问题??(如果能给出例子是最好的…) 3.更好的方案??? 目前来看我更倾向于与第一种方案,实现也很简单但是那个问题我无法确定,或许我应该考虑union+ liststate,jdbc过来的数据最后添加标志位做触发关联,会不会多此一举? 希望能看到各位的回复,感谢
Re: 请问pyflink如何跟kerberos认证的kafka对接呢
Hi, 看你之前发的邮件,你现在是把kerberos相关的配置放在某一个flink-conf.yaml里,然后启动了一个local模式吧? 但是local模式的pyflink shell是不会主动读取任何flink-conf.yaml的。需要配置环境变量FLINK_HOME,将相关配置写入$FLINK_HOME/conf/flink-conf.yaml里,并且只有在提交job时候(flink run、remote模式或者yarn模式)才会去读取flink-conf.yaml里的内容。 如果执意要在local模式下尝试,可以通过以下代码: from pyflink.java_gateway import get_gateway System = get_gateway().jvm.System 拿到java中的System对象,然后按照java中的方式进行配置。 > 在 2021年1月30日,13:58,瞿叶奇 <389243...@qq.com> 写道: > > 附测试程序,希望老师给出解决方法。测试时发现只更新csv文件的修改日期,但没有实际数据录入。怀疑kafka连接还存在问题。 > #!/usr/bin/python3.7 > # -*- coding: UTF-8 -*- > from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode > from pyflink.table import StreamTableEnvironment, TableConfig, DataTypes, > CsvTableSink, WriteMode, SqlDialect > from pyflink.table.descriptors import FileSystem,OldCsv,Schema,Kafka,Json > s_env = StreamExecutionEnvironment.get_execution_environment() > s_env.set_parallelism(1) > s_env.enable_checkpointing(3000) > st_env = StreamTableEnvironment.create(s_env, TableConfig()) > st_env.use_catalog("default_catalog") > st_env.use_database("default_database") > st_env.connect(Kafka().version("universal").topic("qyq13").start_from_earliest().property("zookeeper.connect", > > "192.168.0.120:24002,192.168.0.238:24002,192.168.0.6:24002").property("security.protocol", > 'SASL_PLAINTEXT').property("sasl.kerberos.service.name", > 'kafka').property("kerberos.domain.name", > 'hadoop.hadoop.com').property("bootstrap.servers", > "192.168.0.151:21007,192.168.0.29:21007,192.168.0.93:21007")).with_format(Json().fail_on_missing_field(False).schema(DataTypes.ROW([DataTypes.FIELD("id", > DataTypes.BIGINT()),DataTypes.FIELD("name", > DataTypes.STRING())]))).with_schema(Schema().field("id", > DataTypes.BIGINT()).field("name", > DataTypes.STRING())).register_table_source("sourceKafka") > fieldNames = ["id", "name"] > fieldTypes = [DataTypes.BIGINT(),DataTypes.STRING()] > csvSink = CsvTableSink(fieldNames, fieldTypes, "/tmp/result2021.csv", ",", 1, > WriteMode.OVERWRITE) > st_env.register_table_sink("csvTableSink", csvSink) > resultQuery = st_env.sql_query("select id,name from sourceKafka") > resultQuery = resultQuery.insert_into("csvTableSink") > st_env.execute("pyflink-kafka-v2")
Re: Re: 水印的作用请教
StreamAPI使用的是StreamingFileSink,SQL就是FileSystem了 发件人: tison 发送时间: 2021-02-01 11:01 收件人: user-zh 主题: Re: Re: 水印的作用请教 请问你使用哪种 SinkConnector 写入 HDFS 呢? Best, tison. amenhub 于2021年2月1日周一 上午10:58写道: > >>> > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理? > > > > > 发件人: amenhub > 发送时间: 2021-02-01 10:44 > 收件人: user-zh > 主题: Re: Re: 水印的作用请教 > 谢谢回复! > > 也就是说如果我利用Flink从Kafka (Select > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? > > best, > amenhub > > > > 发件人: tison > 发送时间: 2021-02-01 10:36 > 收件人: user-zh > 主题: Re: 水印的作用请教 > 取决于你的计算流图,watermark 通常只在以下情况有实际作用 > True > & cond 1. 使用 EventTime > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer > Best, > tison. > amenhub 于2021年2月1日周一 上午10:26写道: > > hi everyone, > > > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > > 那么, > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > > > best, > > amenhub > > > > > > > > >
Re: 是否可以 hive 流 join hive 流?
p1.time 是数据记录里的时间,也用这个时间做分区 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 水印的作用请教
请问你使用哪种 SinkConnector 写入 HDFS 呢? Best, tison. amenhub 于2021年2月1日周一 上午10:58写道: > >>> > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理? > > > > > 发件人: amenhub > 发送时间: 2021-02-01 10:44 > 收件人: user-zh > 主题: Re: Re: 水印的作用请教 > 谢谢回复! > > 也就是说如果我利用Flink从Kafka (Select > *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? > > best, > amenhub > > > > 发件人: tison > 发送时间: 2021-02-01 10:36 > 收件人: user-zh > 主题: Re: 水印的作用请教 > 取决于你的计算流图,watermark 通常只在以下情况有实际作用 > True > & cond 1. 使用 EventTime > & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer > Best, > tison. > amenhub 于2021年2月1日周一 上午10:26写道: > > hi everyone, > > > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > > 那么, > > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > > > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > > > best, > > amenhub > > > > > > > > >
Re: Re: 水印的作用请教
>>> 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? 另外还是比较想了解,当指定水印及延迟的时候,比水印晚的数据,在流写HDFS的时候会怎么处理? 发件人: amenhub 发送时间: 2021-02-01 10:44 收件人: user-zh 主题: Re: Re: 水印的作用请教 谢谢回复! 也就是说如果我利用Flink从Kafka (Select *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? best, amenhub 发件人: tison 发送时间: 2021-02-01 10:36 收件人: user-zh 主题: Re: 水印的作用请教 取决于你的计算流图,watermark 通常只在以下情况有实际作用 True & cond 1. 使用 EventTime & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer Best, tison. amenhub 于2021年2月1日周一 上午10:26写道: > hi everyone, > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > 那么, > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > best, > amenhub > > > >
Re: flink sql时区问题
嗯,flink 中 很多时间函数比如PROCTIME()/CURRENT_TIMESTAMP 返回的值都是 UTC+0的时间值,这里的timezone设置对这些函数不生效的,这些函数是有点时区问题的, 目前只能在代码里通过加减时区偏移绕过。 > 在 2021年2月1日,10:50,沉醉寒風 <1039601...@qq.com> 写道: > > 在代码中这样设置 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) > 也不管用. 还是要自己手动去加减时间才能做到,方法比较笨, > > > > > --原始邮件-- > 发件人: > "user-zh" > > 发送时间:2021年2月1日(星期一) 上午10:46 > 收件人:"user-zh" > 主题:Re: flink sql时区问题 > > > > Hi, > 时区不生效在你的代码中是体现在那些地方呀?目前flink sql是有些时区问题,社区也希望在1.13能解决掉。 > > > 在 2021年2月1日,10:42,沉醉寒風 <1039601...@qq.com 写道: > > streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
?????? flink sql????????
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) . ,??, ---- ??: "user-zh"
Re: flink sql时区问题
Hi, 时区不生效在你的代码中是体现在那些地方呀?目前flink sql是有些时区问题,社区也希望在1.13能解决掉。 > 在 2021年2月1日,10:42,沉醉寒風 <1039601...@qq.com> 写道: > > streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
Re: 是否可以 hive 流 join hive 流?
Hi,macdoor 很有意思的case,p1.time字段是你记录里的时间吗? 你hive表的分区字段和这个时间字段的关系是怎么样的呀? > 在 2021年1月30日,17:54,macdoor 写道: > > 具体需求是这样,采集取得的通道总流量5分钟一次存入 hive 表,为了取得 5 分钟内该通道的流量,需要前后2次采集到的总流量相减,我想用同一个 hive > 表自己相互 join,形成 2 个 hive 流 join,不知道是否可以实现?或者有其他实现方法吗? > 我现在使用 crontab 定时 batch 模式做,希望能改成 stream 模式 > > select p1.traffic -p2.traffic > from p as p1 > inner join p as p2 on p1.id=p2.id and p1.time=p2.time + interval 5 minutes > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 水印的作用请教
谢谢回复! 也就是说如果我利用Flink从Kafka (Select *)采集数据到HDFS,不涉及Timer触发逻辑,使用水印的目的就只是为了使用事件时间以及HDFS中基于事件时间进行分区目录创建,对吗? best, amenhub 发件人: tison 发送时间: 2021-02-01 10:36 收件人: user-zh 主题: Re: 水印的作用请教 取决于你的计算流图,watermark 通常只在以下情况有实际作用 True & cond 1. 使用 EventTime & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer Best, tison. amenhub 于2021年2月1日周一 上午10:26写道: > hi everyone, > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > 那么, > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > best, > amenhub > > > >
flink sql????????
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))
flink sql ????????????
flink sql+8,??
Re: 水印的作用请教
取决于你的计算流图,watermark 通常只在以下情况有实际作用 True & cond 1. 使用 EventTime & cond 2. 流图中有 Timer 触发逻辑,例如 Window 算子或自定义 Timer Best, tison. amenhub 于2021年2月1日周一 上午10:26写道: > hi everyone, > > 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 > > 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 > 那么, > 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? > > 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? > > best, > amenhub > > > >
Re: 咨询求助
> 在 2021年1月31日,20:15,Appleyuchi 写道: > > 一点小小的建议哈, > 目前flink社区讨论主要还是java/scala为主, > 如果执意使用pyflink的话,后续极有可能会遇到较大的阻力. 我理解这种较大阻力应该不存在的,社区里pyflink的投入还挺大的,蛮多开发者的,我也cc两位在这块做贡献的社区开发者,从JIRA上看pyflink相关的开发进度都挺快的。 如果有机器学习,python相关的经验,用pyflink我觉得挺合适的。 祝好, Leonard
水印的作用请教
hi everyone, 最近在使用Stream API或Table API编写任务,接Kafka数据流写HDFS的时候,关于水印有几个问题想请教社区帮忙答疑。 在我的理解中,水印本意是容忍事件时间的延迟(乱序程序),在不能容忍的时候触发窗口计算,以达到输出该窗口结果的目的。 那么, 1.在Kafka入HDFS的过程中,水印的作用具体是什么呢?貌似无窗口计算? 2.如果我在SQL或StreamAPI中使用ForBoundedOutOfOrderness中基于事件时间设置允许5分钟延迟的水印,那么Kafka各分区最小时间的水印到达后,比水印晚的数据还会写入HDFS(基于事件时间分区)对应的分区中吗?是丢弃?还是不丢弃它写入对应分区后再重新幂等提交该分区? best, amenhub
?????? ??????????????????????????????????watermark,????????????????????????watermark???????????????1??
, ---- ??: "user-zh"
Re: Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
你好,图片在上传在附件里面了 hezongji...@qq.com 发件人: tison 发送时间: 2021-02-01 09:31 收件人: user-zh 主题: Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了 邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。 Best, tison. hezongji...@qq.com 于2021年2月1日周一 上午9:28写道: > 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了 > 代码如下: > > 运行结果如下: > -- > hezongji...@qq.com >
Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长
有时候这种job持续2个多小时,我只能cancel job,但无法正常 cancel,都会导致 taskmanager 挂掉,错误如下 2021-01-31 23:04:23,677 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-01-31 23:04:23,685 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1685) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-01-31 23:04:23,686 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Stopping TaskExecutor akka.tcp://flink@10.13.69.52:45901/user/rpc/taskmanager_0. 2021-01-31 23:04:23,686 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection 1bd159f361d86e77d17e261ab44b5128. 2021-01-31 23:04:23,689 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: HiveSource-snmpprobe.p_port_traffic_5m -> Calc(select=[binaryid AS id, ver, CAST(2021-01-31 21:45:00:TIMESTAMP(6)) AS coltime, CAST(in_octets) AS in_octets, CAST(out_octets) AS out_octets, CAST(bi_octets) AS bi_octets, CAST(unimax_octets) AS unimax_octets, in_speed, out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util, unimax_util, inout_ratio, bandwidth, origin, CAST((() DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss')) AS crtime], where=[(coltime = 2021-01-31 21:45:00:TIMESTAMP(9))]) -> Sink: Sink(table=[myhive.prod_mysql_zqzynetdb.p_port_traffic_5m], fields=[id, ver, coltime, in_octets, out_octets, bi_octets, unimax_octets, in_speed, out_speed, bi_speed, unimax_speed, in_util, out_util, bi_util, unimax_util, inout_ratio, bandwidth, origin, crtime]) (1/1)#0' did not react to cancelling signal for 30 seconds, but is stuck in method: java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107) com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150) com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180) java.io.FilterInputStream.read(FilterInputStream.java:133) com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64) com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63) com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52) com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41) com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54) com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44) com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:538) com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:708) com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:647) com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:946) com.mysql.cj.NativeSession.execSQL(NativeSession.java:1075) com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930) com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092) com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832) com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435) com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
Re: 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
邮件列表不支持直接粘贴图片,请尝试使用附件或 gist 等方式共享。 Best, tison. hezongji...@qq.com 于2021年2月1日周一 上午9:28写道: > 为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了 > 代码如下: > > 运行结果如下: > -- > hezongji...@qq.com >
Re: 请教 flink 1.12.1 插入 mysql 有时耗时很长
打开了 debug 级别的日志,有这样的错误 2021-01-31 20:45:30,364 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager [] - Released partition dc8a2804b6df6b0ceaee2610ccf6c6e5#312 produced by 448c5ac36dcda818f56ec5bbd728da10. 2021-01-31 20:45:30,392 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Free slot with allocation id 80a1592c9e59efd80e412e7dee99f70c because: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). 2021-01-31 20:45:30,392 DEBUG org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb (360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: 80a1592c9e59efd80e412e7dee99f70c, jobId: d055754b88483b13648cc3fb32d9cd58). org.apache.flink.util.FlinkException: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:187) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12.1.jar:1.12.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12.1.jar:1.12.1] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12.1.jar:1.12.1] 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Releasing local state under allocation id 80a1592c9e59efd80e412e7dee99f70c. 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Free slot with allocation id 4606a6194b4380efb5c2f95fc65bf01e because: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). 2021-01-31 20:45:30,393 DEBUG org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:12, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1., taskHeapMemory=344.000mb (360710140 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=32.000mb (33554432 bytes)}, allocationId: 4606a6194b4380efb5c2f95fc65bf01e, jobId: d055754b88483b13648cc3fb32d9cd58). org.apache.flink.util.FlinkException: Stopping JobMaster for job ifXTable->p_port_traffic_5m @2021-01-31 20:30:00(d055754b88483b13648cc3fb32d9cd58). at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:416) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at
为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了
为什么每次打印的是上一条消息产生的watermark,不是最新的一条消息产生的watermark呢?并行度设置为1了 代码如下: 运行结果如下: hezongji...@qq.com
请教 flink 1.12.1 插入 mysql 有时耗时很长
周期性batch mode 从 hive 提取数据插入 mysql,每批次 10K 到 20K 行数据,多数情况下 10-20秒可以完成,但不定期就会很长时间,能达到 20多分钟,但也能成功,查看了日志也看不到错误,检查 mysql 也没有发现锁表,怀疑 hive metastore 的性能,但也没看出问题。 请教分析思路,从 flink 上能看出job 在等待什么吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:咨询求助
一点小小的建议哈, 目前flink社区讨论主要还是java/scala为主, 如果执意使用pyflink的话,后续极有可能会遇到较大的阻力. 在 2021-01-31 14:26:55,"瞿叶奇" <389243...@qq.com> 写道: >您好,我是国网陕西采集系统开发人员,我们在架构改造中,准备使用pyflink >解决实时Kafka数据写HDFS的问题,我的Kafka集群存在kerberos安全认证,导致我现在还没连接上,能不能给一个样例呢?