Flink DataStream

2020-06-23 Thread xuhaiLong
Hi 请教一个问题 我需要对一个类似这样的数据进行计算获取用户 categoryId | userId | articleID | categoryId | score | | 01 | A | 1 | 10 | | 01 | B | 1 | 20 | | 01 | C | 2 | 30 | 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现? 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table AP

Flink DataStream ????UV????

2020-07-07 Thread ?g???U?[????
     DataStream??apiUV??2 1Tumbling??1Time.days(1)??uv trigger 2???

Re: Flink DataStream

2020-06-23 Thread LakeShen
Hi xuhaiLong, 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 Best, LakeShen xuhaiLong 于2020年6月23日周二 下午7:18写道: > Hi > > 请教一个问题 > > > 我需要对一个类似这样的数据进行计算获取用户 categoryId > | userId | articleID | categoryId | score | > | 01 | A | 1 | 10 | > | 01 | B | 1 | 20 | > | 01 | C | 2 | 30 | > > > > > 目前我的实现是使用table

Re: Flink DataStream

2020-06-23 Thread Jark Wu
Hi xuhaiLong, 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old planner 呢? Best, Jark On Tue, 23 Jun 2020 at 19:44, LakeShen wrote: > Hi xuhaiLong, > > 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 > > Best, > LakeShen > > xuhaiLong 于2020年6月23日周二 下午7:18写道: > > > Hi > > > > 请

Re: Flink DataStream

2020-06-23 Thread xuhaiLong
使用的是1.10.1,在 table api 无法使用ROW_NUMBER On 6/23/2020 19:52,Jark Wu wrote: Hi xuhaiLong, 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old planner 呢? Best, Jark On Tue, 23 Jun 2020 at 19:44, LakeShen wrote: Hi xuhaiLong, 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 Best, Lake

Re: Flink DataStream

2020-06-23 Thread Jark Wu
图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。 On Tue, 23 Jun 2020 at 19:59, xuhaiLong wrote: > 使用的是1.10.1,在 table api 无法使用ROW_NUMBER > On 6/23/2020 19:52,Jark Wu wrote: > > Hi xuhaiLong, > > 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old > planner 呢? > > Best, > Jark > > On Tue, 23 J

Re: Flink DataStream

2020-06-23 Thread xuhaiLong
"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1", "org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided", 看下粘贴的 sbt 依赖 On 6/23/2020 20:06,Jark Wu wrote: 图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。 On Tue, 23 Jun 2020 at 19:59, xuhaiLong wrote: 使用的是1.10.1,在 table api 无法使用ROW_NUMBER

Re: Flink DataStream

2020-06-23 Thread LakeShen
Hi xuhaiLong, 看你的依赖,应该用的 old planner,你要使用 blink planner 才能使用row_number 函数。要使用 flink-table-planner-blink_2.11 具体文档参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/#table-program-dependencies Best, LakeShen xuhaiLong 于2020年6月23日周二 下午8:14写道: > "org.apache.flink" %% "flin

Re: Flink DataStream

2020-06-23 Thread xuhaiLong
是我的问题,引用了old planner。感谢! On 6/23/2020 21:05,LakeShen wrote: Hi xuhaiLong, 看你的依赖,应该用的 old planner,你要使用 blink planner 才能使用row_number 函数。要使用 flink-table-planner-blink_2.11 具体文档参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/#table-program-dependencies Best, LakeShen xuh

Flink DataStream Micro-Batch

2019-10-21 Thread 王佩
想在Flink DataStream中实现微批,可以做到吗。该如何实现? 比方说基于处理时间,从Kafka 消费数据,每隔5s产生一个Batch,然后对这个Batch处理。 感谢!

Flink DataStream Broadcast variable

2019-10-21 Thread 王佩
请教一下, Flink DataStream中有广播变量的概念吗。 比方说,我有一个全局变量ArrayList,在JobManager端更新,然后在TaskManager中共享。可以做到吗 感谢!

Re: Flink DataStream Micro-Batch

2019-10-22 Thread Utopia
用 Window 实现。 2019年10月22日 +0800 13:25 王佩 ,写道: > 想在Flink DataStream中实现微批,可以做到吗。该如何实现? > > 比方说基于处理时间,从Kafka 消费数据,每隔5s产生一个Batch,然后对这个Batch处理。 > > 感谢!

Re: Flink DataStream Broadcast variable

2019-10-22 Thread 18612537914
你好, https://www.jianshu.com/p/c8c99f613f10?from=singlemessage&isappinstalled=0 希望可以帮助你 > 在 2019年10月22日,下午2:04,王佩 写道: > > 请教一下, Flink DataStream中有广播变量的概念吗。 > > 比方说,我有一个全局变量ArrayList,在JobManager端更新,然后在TaskManager中共享。可以做到吗 > > 感谢!

Flink DataStream ElasticsearchSink Checkpoint Error

2019-10-22 Thread 王佩
Flink 写入 Elasticsearch,Checkpoint 一直处于IN_PROGRESS 状态,从而导致数据不能写入ES。 如图: [image: image.png] 帮忙看下! 感谢!

Flink DataStream KeyedStream 与 AggregateFunction

2019-11-08 Thread 王佩
请教下: 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗? 2、假设1成立,这样就会有数据倾斜的问题。该如何解决? 3、假设1成立,如: DataStream .keyBy(userID) .timeWindow() .aggregate(new AggregateFunction(...)),这里的AggregateFu

Re: Flink DataStream ElasticsearchSink Checkpoint Error

2019-10-24 Thread Congxian Qiu
你的图挂了,如果 checkpoint 有问题的话,可以先参考文章[1] 自查一下,如果还有问题,可以在邮件列表更新 [1] https://mp.weixin.qq.com/s/0jTVXa9ktyLynwvRelRYYg Best, Congxian 王佩 于2019年10月23日周三 上午8:38写道: > Flink 写入 Elasticsearch,Checkpoint 一直处于IN_PROGRESS 状态,从而导致数据不能写入ES。 > > 如图: > > [image: image.png] > > 帮忙看下! > 感谢! >

Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-09 Thread Px New
mber 9, 2019 11:45 AM > 收件人: user-zh > 主题: Flink DataStream KeyedStream 与 AggregateFunction > > 请教下: > > 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗? > > 2、假设1成立,这样就会有数据倾斜的问题。该如何解决? > > 3、假设1成立,如: DataStream >.keyB

Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-11 Thread Tony Wei
是否可以本地先聚合? >> 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥? >> >> -邮件原件- >> 发件人: 王佩 >> 发送时间: Saturday, November 9, 2019 11:45 AM >> 收件人: user-zh >> 主题: Flink DataStream KeyedStream 与 AggregateFunction >> >> 请教下: >>

Re: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-11 Thread tison
oujun 于2019年11月9日周六 下午7:46写道: > >> 1, 是 >> 2,没有标准答案,是否可以本地先聚合? >> 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥? >> >> -邮件原件- >> 发件人: 王佩 >> 发送时间: Saturday, November 9, 2019 11:45 AM >> 收件人: user-zh >> 主题: Flink Data

回复: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-09 Thread Yuan,Youjun
1, 是 2,没有标准答案,是否可以本地先聚合? 3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥? -邮件原件- 发件人: 王佩 发送时间: Saturday, November 9, 2019 11:45 AM 收件人: user-zh 主题: Flink DataStream KeyedStream 与 AggregateFunction 请教下: 1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗? 2、假设1成立,这样就

Re: Flink DataStream 统计UV问题

2020-07-09 Thread shizk233
Hi Jiazhi, 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL Best, shizk233 ゞ野蠻遊戲χ 于2020年7月7日周二 下午10:27写道: > 大家好! > > 想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题: > 1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Ti

Re: Flink DataStream 统计UV问题

2020-07-09 Thread tison
你这个需求貌似是要看一天的 UV 的实时更新量,可以看一下 sliding window。如果是每天 0 点清零,实时看今天的 UV,那就是另一个问题了,应该需要自己定义 trigger & evictor 每条触发一次 window...看你数据量吧 Best, tison. shizk233 于2020年7月10日周五 上午10:23写道: > Hi Jiazhi, > > > 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的tri

Flink Datastream实现删除操作

2024-06-04 Thread zapjone
各位大佬好: 想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?

flink DataStream scala api can not overloaded method aggregate

2020-08-18 Thread Zhou Zach
Hi all, 我在使用scala 开发streaming应用时,使用了AggregateFunction with a ProcessWindowFunction,flink 版本是1.11.1,但是idea报错: Error:(80, 8) overloaded method value aggregate with alternatives: 很奇怪为什么不能重载下面的方法 def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](preAggregator: Aggregate

flink DataStream scala api can not overloaded method aggregate

2020-08-18 Thread Zhou Zach
Hi all, 我在使用scala 开发streaming应用时,使用了AggregateFunction with a ProcessWindowFunction,flink 版本是1.11.1,但是idea报错: Error:(80, 8) overloaded method value aggregate with alternatives: 很奇怪为什么不能重载下面的方法 def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation](preAggregator: Aggregate

Flink DataStream 用ElasticsearchSink 写ES ConnectionClosedException异常

2019-09-17 Thread 王佩
在Flink 写ES,当ES集群繁忙时,会有如下异常: 2019-09-17 16:01:02 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase afterBulk 430 Failed Elasticsearch bulk request: Connection closed org.apache.http.ConnectionClosedException: Connection closed at org.apache.http.nio.protocol.HttpAsyncR

Re: flink DataStream scala api can not overloaded method aggregate

2020-12-20 Thread hiaQ
哈喽,请问你这个问题解决了吗?我也遇到了同样的问题... -- Sent from: http://apache-flink.147419.n8.nabble.com/

答复: Flink Datastream实现删除操作

2024-06-04 Thread Xiqian YU
on/ 发件人: zapjone 日期: 星期二, 2024年6月4日 18:34 收件人: user-zh@flink.apache.org 主题: Flink Datastream实现删除操作 各位大佬好: 想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?

flink dataStream多次sink DAG重复驱动执行?

2021-03-05 Thread lp
有个疑问, 如下程序片段: -- Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); properties.setProperty("group.id",kafkaOdsGroup); properties.setProperty("auto.offset.reset",kafkaOdsAutoOffsetReset); properties.setProperty(Flin

Re: flink dataStream多次sink DAG重复驱动执行?

2021-03-07 Thread Evan
个人理解是不会重复驱动执行的,具体你可以测试一下,但是从底层原理上讲,我也讲不了。 发件人: lp 发送时间: 2021-03-05 17:31 收件人: user-zh 主题: flink dataStream多次sink DAG重复驱动执行? 有个疑问, 如下程序片段: -- Properties properties = new Properties(); properties.setProperty("bootstrap.servers",kafkaAddr); properties.setProperty

Flink DataStream 作业如何获取到作业血缘?

2024-02-26 Thread casel.chen
一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?

求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 Thread huang botao
Hi ,请教一个奇怪的问题: streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) .assignTimestampsAndWatermarks(new CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) .connect(ruleConfigSource) .process(new MetricDataFilterProcessFunction()) .keyBy((KeySelector) metric -> {

flink datastream api写的代码如何在idea中调试

2023-04-22 Thread m18751805115_1
请教一下,在idea中用flink datastream api写的代码,source输入是一条一条socket流数据,那如何在本地idea中进行调试,观察每条输入数据的运行情况,idea是否支持这种调试?

Re: Flink DataStream 作业如何获取到作业血缘?

2024-02-26 Thread Feng Jin
, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: > 一个Flink DataStream 作业从mysql cdc消费处理后写入apache > doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink > connector信息,包括连接字符串、数据库名、表名等?

Re:求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 Thread hailongwang
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进 在 2020-11-18 15:29:54,"huang botao" 写道: >Hi ,请教一个奇怪的问题: > >streamSource.flatMap(new ComeIntoMaxFlatMapFunction()) > >.assignTimestampsAndWatermarks(new >CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds())) > >.connect(ruleConfigSour

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 Thread huang botao
感谢您的回复,是这样的,我这边的环境设置用的是eventTime StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); window设置的是 slid(2,1), 但是等了足够长的时间还是不能触发 apply 方法 On Wed, Nov 18, 2020 at 5:50 PM hailongwang <18868816...@163.com

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 Thread zhisheng
可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 Best zhisheng huang botao 于2020年11月18日周三 下午10:34写道: > 感谢您的回复,是这样的,我这边的环境设置用的是eventTime > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-18 Thread huang botao
hi, zhisheng, hailongwang: 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 On Wed, Nov 18, 2020 at 10:46 PM zhisheng wrote: > 可以检查一下作业消费的 kafka 分区是否都有数据,如果有的分区无数据的话,那么可能会导致水印不会更新,从而窗口触发不了。 > > Best > zhisheng > > huang botao 于2020年11月18日周三

Re: 求助:Flink DataStream 的 windowoperator 后面apply 方法不执行

2020-11-22 Thread 赵一旦
connect前生成watermark也是可以的应该,但是你需要把ruleConfigSource流也赋watermark。我猜是这个地方出问题了。 huang botao 于2020年11月19日周四 下午12:58写道: > hi, zhisheng, hailongwang: > > 感谢对这个问题的解答,这个问题确实出在了window无法触发的地方,原因是 在connect() > 后面没有定义watermar导致,在connect后指定watermark就可以触发window了。 > > > > On Wed, Nov 18, 2020 at 10:46 PM zhishen

Re: flink datastream api写的代码如何在idea中调试

2023-04-22 Thread Feng Jin
支持的, 在 idea 中执行 main 函数即可.执行前,idea 中的运行配置中,最好勾选上: *Include dependencies with "Provided" scope *否则有可能会有 class not found 的报错. Best, Feng Jin On Sat, Apr 22, 2023 at 9:28 PM m18751805115_1 <18751805...@163.com> wrote: > 请教一下,在idea中用flink datastream > api写的代码

Re: flink datastream api写的代码如何在idea中调试

2023-04-22 Thread Feng Jin
g Jin | > | 日期 | 2023年04月22日 21:53 | > | 收件人 | user-zh@flink.apache.org | > | 抄送至 | | > | 主题 | Re: flink datastream api写的代码如何在idea中调试 | > 支持的, 在 idea 中执行 main 函数即可.执行前,idea 中的运行配置中,最好勾选上: *Include dependencies > with "Provided" scope *否则有可能会有 class not found 的报错. >

回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 Thread 阿华田
/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java Best, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: 一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink

回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread 阿华田
日 16:18,Zhanghao Chen 写道: JobGraph 里有个字段就是 jobid。 Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 14:14 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田

回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread 阿华田
From: 阿华田 Sent: Friday, March 8, 2024 16:48 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在 SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid, JobGraph可以拿到source和

回复:flink datastream api写的代码如何在idea中调试

2023-04-22 Thread m18751805115_1
抱歉啊,可能我没有把问题描述清楚。我是想本地对代码进行debug调试,观察每条流输入后的变量值以及调用栈等信息的。 回复的原邮件 | 发件人 | Feng Jin | | 日期 | 2023年04月22日 21:53 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | Re: flink datastream api写的代码如何在idea中调试 | 支持的, 在 idea 中执行 main 函数即可.执行前,idea 中的运行配置中,最好勾选上: *Include dependencies with

回复:flink datastream api写的代码如何在idea中调试

2023-04-22 Thread m18751805115_1
thx 回复的原邮件 | 发件人 | Feng Jin | | 日期 | 2023年04月22日 22:22 | | 收件人 | user-zh@flink.apache.org | | 抄送至 | | | 主题 | Re: flink datastream api写的代码如何在idea中调试 | 如果你是要本地 idea debug 线上的作业,需要在 taskManager 的 JVM 参数中开启debug 提交作业时, 添加参数: env.java.opts.taskmanager="-agentlib:jdwp=transport=dt_s

Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
JobGraph 里有个字段就是 jobid。 Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 14:14 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名

Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 16:48 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? ”JobGraph 可以获得 transformation 信息

Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
From: 阿华田 Sent: Friday, March 8, 2024 18:47 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? 我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 不满足需求 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年03月8日 18:23