Periodic output at end of stream

2021-08-13 Thread Matthias Broecheler
Hey guys, I have a KeyedProcessFunction that gathers statistics on the events that flow through and emits it periodically (every few seconds) to a SideOutput. However, at the end of stream the last set of statistics don't get emitted. I read on the mailing list that processing time timers that

Upgrading from Flink on YARN 1.9 to 1.11

2021-08-13 Thread Hailu, Andreas [Engineering]
Hello folks! We're looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted. Our current submission command looks like this: $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu

Re: StreamFileSink not closing file

2021-08-13 Thread Matthias Broecheler
Thank you, Yun, for pointing me to the related issue. I'll keep an eye on it. All the best, Matthias On Wed, Aug 11, 2021 at 10:50 PM Yun Gao wrote: > Hi Matthias, > > Sorry for the late reply, this should be a known issue that Flink would > lost the last piece of data for bounded dataset with

Re: 1.13 Flamegraphs

2021-08-13 Thread Matthias Pohl
Hi Mason, I'm adding Alex to the thread as he might be able to help answer this question in the most precise way next week. Best, Matthias On Fri, Aug 6, 2021 at 7:43 PM Mason Chen wrote: > Hi all, > > Does the sample processing also sample threads that do not belong to the > Flink framework?

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread David Morávek
To give you a better idea, in high-level I think could look something like this [1]. [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8 On Fri, Aug 13, 2021 at 2:57 PM Rion Williams wrote: > Hi David, > > Thanks for

RE: Bug with PojoSerializer? java.lang.IllegalArgumentException: Can not set final double field Event.rating to java.lang.Integer

2021-08-13 Thread Nathan Yu
When the exception is thrown in PojoSerializer for my Event class, yoinks String subclassName = source.readUTF();// subclassName is “java.lang.Integer” try { actualSubclass = Class.forName(subclassName, true, cl); } catch

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread Rion Williams
Hi David, Thanks for your response! I think there are currently quite a few unknowns in my end in terms of what a production loads look like but I think the number of clusters shouldn’t be too large (and will either rarely change or have new entries come in at runtime, but it needs to support

RE: Bug with PojoSerializer? java.lang.IllegalArgumentException: Can not set final double field Event.rating to java.lang.Integer

2021-08-13 Thread Nathan Yu
Does flink provide any hooks for objects before/after they are serialized/deserialized?

Re: Bug with PojoSerializer? java.lang.IllegalArgumentException: Can not set final double field Event.rating to java.lang.Integer

2021-08-13 Thread JING ZHANG
Hi Yu, Thias provides a nice method to debug the issue. Big +1. Please try the way, feel free get back for further discussion. Best, JING ZHANG Schwalbe Matthias 于2021年8月13日周五 下午3:22写道: > Good Morning Nathan, > > > > The exception stack does not give enough information yet to come to a >

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread David Morávek
Hi Rion, As you probably already know, for dynamic indices, you can simply implement your own ElasticsearchSinkFunction

Re: ProcessFunctionTestHarnesses for testing Python functions

2021-08-13 Thread Matthias Pohl
Hi Bogdan, it does not look like it is by just doing a brief check of the code. But maybe Dian can give a more detailed answer here. I'm gonna add him to this thread. Best, Matthias On Wed, Jun 9, 2021 at 3:47 PM Bogdan Sulima wrote: > Hi all, > > in Java/Scala i was using

Scaling Flink for batch jobs

2021-08-13 Thread Gorjan Todorovski
Hi! I want to implement a Flink cluster as a native Kubernetes session cluster, with intention of executing Apache Beam jobs that will process only batch data, but I am not sure I understand how I would scale the cluster if I need to process large datasets. My understanding is that to be able to

FLink 1.13.2 use TVF data is not correct

2021-08-13 Thread 李占阳
Hi all: 我在使用flink1.13.2 的时候利用TVF 进行统计发下滚动统计的结果和离线明细统计的每天总量不对。下面是我的sql: String message = " CREATE TABLE test(\n" + "gid VARCHAR COMMENT 'uuid 唯一标识',\n" + "ip VARCHAR COMMENT 'ip 地址',\n" + "business_no VARCHAR COMMENT '商户号',\n" + "

flink调用drools服务器问题

2021-08-13 Thread yanyunpeng
Flink 中使用 drools client 执行规则 本地启动没问题 部署到集群的时候报错空指针 KieCommands kieCommands = KieServices.Factory.get().getCommands(); List> commands = new LinkedList<>(); commands.add(kieCommands.newInsert(event, "event")); commands.add(kieCommands.newFireAllRules()); KieCommands kieCommands =

Re:Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint

2021-08-13 Thread Michael Ran
batch 和数量小点呗 ~。~ 在 2021-08-12 10:09:21,"周瑞" 写道: 您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决

RE: Bug with PojoSerializer? java.lang.IllegalArgumentException: Can not set final double field Event.rating to java.lang.Integer

2021-08-13 Thread Schwalbe Matthias
Good Morning Nathan, The exception stack does not give enough information yet to come to a solution, the way I would continue is this: * Given that you run in a local environment probably means that you could run your job in a debugger and * Place an exception break point for