Filesystem as a stream source in Table/SQL API

2020-11-20 Thread eef hhj
Hi, I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select. While I'm adding a new file to the folder, the query

Flink Standalone HA问题

2020-11-20 Thread Fei Han
@all! Flink版本是1.10.2。集群模式是Flink Standalone HA。 问题: 如果在hadoop HA 的情况下,两个namenode都宕机了。重启机器后,启动hadoop。 Flink Standalone HA 观察后,选举5分钟后才成功?请问是不是正常的?

Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 Thread air23
但是 在yarn上跑的spark 任务 都是可以看到错误日志的, flink这边配置的是log4j的日志文件,本地运行 控制台是可以看到错误原因 和日志 在 2020-11-20 17:53:03,"caozhen" 写道: > >1、jobmanager的日志有没有错误呢? >2、或者通过yarn history查下日志 yarn logs -applicationId xxx >3、如果是flink client 提交作业可以看下客户端日志 > > > >air23 wrote >> 你好 >> flink on yarn 任务FAILED后 没有错误日志 输出到yarn

Re:Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 Thread air23
yarn logs -applicationId xxx 和 yarn 历史log 都查看不到FAILED 错误日志。 在 2020-11-20 17:53:03,"caozhen" 写道: > >1、jobmanager的日志有没有错误呢? >2、或者通过yarn history查下日志 yarn logs -applicationId xxx >3、如果是flink client 提交作业可以看下客户端日志 > > > >air23 wrote >> 你好 >> flink on yarn 任务FAILED后 没有错误日志 输出到yarn log >> 这样定位不到

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply! Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should have 1 parallism in topo, also all A_i can start from the same timestamp, but some minor difference of resume timestamp in different A_i source is also acceptable. So I think multiple T operator is

Re: Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread Xingbo Huang
Hi George, Have you referred to the official document[1]? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html Best, Xingbo 在 2020年11月21日星期六,George Costea 写道: > Hi there, > > Is there an example of how to deploy a flink cluster on Kubernetes? > I'd

Re: Force Join Unique Key

2020-11-20 Thread Rex Fenley
I have a few more questions. Even if a join has no unique keys, couldn't the join key be used to organize records into a tree, of groups of records, per join key so that lookups are faster? I also have been looking at RocksDB docs and it looks like it has a RangeScan operation. I'm guessing then

Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread George Costea
Hi there, Is there an example of how to deploy a flink cluster on Kubernetes? I'd like to deploy the flink cluster, a kafka-broker, and then the greeter example to give it a try. Thanks, George

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Hm yes that are good reasons. The issue is that if you put it into Flink, then it's part of the system classloader of Flink, so there is no way to unload classes or protect Flink's classes (+its dependencies) from being overwritten by your dependencies. I'm thinking that this may cause differences

Re: Logs of JobExecutionListener

2020-11-20 Thread Flavio Pompermaier
I think that the problem is that my REST service submits the job to the Flink standalone cluster and responds to the client with the submitted job ID. To achieve this, I was using the RestClusterClient because with that I can use the following code and retrieve the JobID: (1) JobID flinkJobId

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
A couple of reasons I 've done that - it's listed as an option here : https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization under optional libraries - I have over 200 jobs running that rely on the same core functionality provided by the jar in

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Oh no, please never put user code (with included libraries) into flink's classpath. It's not supported exactly for classloader reasons. Why did you think that this would be a good approach? Is your jar too big? Maybe a different deployment mode would be more appropriate? [1] Alternatively, if

Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-20 Thread Kevin Kwon
Hi I am using MinIO as a S3 mock backend for Native K8S Everything seems to be fine except that it cannot connect to S3 since self-signed certificates' trusted store are not cloned in Deployment resources Below is in order, how I add the trusted keystore by using keytools and how I run my app

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
I just switched from providing my jar while creating a Remote environment to providing this jar on flink's classpath. It used to work just fine when the jar was shipped to Fllink with the job graph. Now when jar is available to flink on the startup the same job that used to run is failing with

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Thanks, Arvid, That is what I thought too. I went through all the instances where it might 've been a member variable and made sure that it's declared as transient :-( Is there anything else I can check? Alex On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise wrote: > Are you using ObjectMapper as a

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
The easiest solution for all non-trivial issues like this is to start the application locally in a local executor, so you can debug in your IDE. Additionally, double-check that you have no lambdas/anonymous classes that reference outer classes with ObjectMapper. ObjectMapper should also be static

Re: Filter Null in Array in SQL Connector

2020-11-20 Thread Rex Fenley
Btw, this is what our source and sink essentially look like, with some columns redacted. CREATE TABLE source_kafka_data ( id BIGINT, roles ARRAY, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'kafka',

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Are you using ObjectMapper as a non-transient field? If so, please make it transient and initialize in open() of a Rich*Function. On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman wrote: > Hi, > I added my custom jar (that includes dependencies on Jackson) to Flink > classpath. It seems to be

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
Hi Timo, One more question, the blog also mentioned a jira task to solve this issue. https://issues.apache.org/jira/browse/FLINK-10886. Will this feature be available in 1.12? Thanks! Best, Fuyao On 11/20/20 11:37, fuyao...@oracle.com wrote: Hi Timo, Thanks for your reply! I think your

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li
Hi Timo, Thanks for your reply! I think your suggestions is really helpful! The good news is that I had managed to figure out it something by myself few days ago. 1. Thanks for the update about the table parallelism issue! 2. After trying out the idleness setting. It prevents some idle

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Arvid Heise
Your topology is definitively interesting and makes sense to me on a high level. The main question remaining is the parallelism. I'm assuming you run your pipeline with parallelism p and both source A and timestampcalculator T are run with parallelism p. You want to create a situation where for

Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Hi, I added my custom jar (that includes dependencies on Jackson) to Flink classpath. It seems to be loaded just fine. But when the job starts I am getting an exception below. I am sure how to interpret the exception though and would appreciate it if somebody gives me advice on it. Thanks Alex

Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread lalala
Hi Kostas, Thank you for your response. Is what you are saying valid for session mode? I can submit my jobs to the existing Flink session, will they be able to share the sources? We do register our Kafka tables to `GenericInMemoryCatalog`, and the documentation says `The GenericInMemoryCatalog

Non uniform distribution of subtasks even with cluster.evenly-spread-out-slots

2020-11-20 Thread Harshit Hajela
Hi Flink Community, I'm currently running a heavy flink job on Flink 1.9.3 that has a lot of subtasks and observing some subtask distribution issues. The job in question has 9288 sub tasks and they are running on a large set of TMs (total available slots are 1792). I'm using the

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther
Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to configure the parallelism for Table operation at operator level" No this is not

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply. I want to join two stream A and stream B. Items in stream A come in first then I keep them in memory cache, as join key and item, then serval minutes later the items in stream B come in then the join work is performed. The timestamp of the latest expired item in memory

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
No magic for JVM properties afaik (and I just looked in the code base for the most obvious candidates). There is also nothing to gain from overwriting properties. I'm also certain that it should work as it's used in most secured setups to inject keys/keytabs. What happens if you execute the Flink

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
Yes, that's what is surprising..I already did a remote debug on the TM and that property is not read..but that's really weird..could it be that the JVM properties gets cleared before invoking the tasks? Il ven 20 nov 2020, 12:50 Arvid Heise ha scritto: > All looks good and as it should be. > >

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
All looks good and as it should be. Can you do a remote debugging session to the tm once more and check Boolean.getBoolean("com.mysql.disableAbandonedConnectionCleanup") There is no magic involved in System properties in Flink. If the property is set on the process, the configuration works. If

回复:flink-1.11.1 Table API /SQL 无法写入hive orc表

2020-11-20 Thread 迎风浪子
各位好,我写orc格式文件,提示找不到文件,但通过dfs -ls文件确是有的,我该怎么解决呢?谢谢! ---原始邮件--- 发件人: "amen...@163.com"https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#program-maven best, amenhub 发件人: Jian Wang 发送时间: 2020-08-31 21:55 收件人: user-zh 主题: flink-1.11.1 Table API /SQL 无法写入hive orc表 Hi

Re:Re: FlinkSQL 支持DDL时 补全字段并进行默认值设置吗?

2020-11-20 Thread hailongwang
是指在 Create Table 时候可以设置每一列的 default value,当这个列的值不存在时候,直接使用 default value 值? 就像传统的 DB 一样。 Best, Hailong 在 2020-11-20 16:21:28,"Jark Wu" 写道: >你说的补全字段是指什么?有没有具体的例子?自动推导 schema 么? > >Best, >Jark > >On Fri, 20 Nov 2020 at 17:09, 孟小鹏 wrote: > >> 目前遇到一个痛点 FlinkSQL可以在DDL时 补全字段并设置默认值吗?这样少了去处理ETL的步骤 >>

Re: flink on yarn 任务FAILED后 没有错误日志 输出到yarn log

2020-11-20 Thread caozhen
1、jobmanager的日志有没有错误呢? 2、或者通过yarn history查下日志 yarn logs -applicationId xxx 3、如果是flink client 提交作业可以看下客户端日志 air23 wrote > 你好 > flink on yarn 任务FAILED后 没有错误日志 输出到yarn log > 这样定位不到 具体是什么问题导致任务 失败了,请问怎么配置把log输出到yarn的log里面 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-20 Thread Niklas Wilcke
Hi Arvid and Jiangjie, thanks to both of you for the quick and valuable response. I will take a look at the linked projects. Kind Regards, Niklas -- niklas.wil...@uniberg.com Mobile: +49 160 9793 2593 Office: +49 40 2380 6523 Simon-von-Utrecht-Straße 85a 20359 Hamburg UNIBERG GmbH

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
I've just tested the following code in a java class and the property (-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true) is read correctly and the abandonedConnectionCleanupDisabled does not initialize the cleanupThreadExecutorService (that in my other test was causing a dynamic classloading

Re: 关于 Flink on K8S Deploy Job Cluster 部署问题

2020-11-20 Thread caozhen
如果要使用官方的 flink on k8s的 per-job模式: [1] per-job模式下,每次启动新的job都需要通过taskmanager-job-deployment.yaml 和 jobmanager-job.yaml创建对应的 TM、JM。 终止任务,则需要删掉对应的yaml文件,也就是终止TM、JM: kubectl delete -f taskmanager-job-deployment.yaml kubectl delete -f jobmanager-job.yaml [1]

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-20 Thread Aljoscha Krettek
Sure, my pleasure! Aljoscha On 19.11.20 16:12, Simone Cavallarin wrote: Many thanks for the Help!! Simone From: Aljoscha Krettek Sent: 19 November 2020 11:46 To: user@flink.apache.org Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() On

Re: FlinkSQL 支持DDL时 补全字段并进行默认值设置吗?

2020-11-20 Thread Jark Wu
你说的补全字段是指什么?有没有具体的例子?自动推导 schema 么? Best, Jark On Fri, 20 Nov 2020 at 17:09, 孟小鹏 wrote: > 目前遇到一个痛点 FlinkSQL可以在DDL时 补全字段并设置默认值吗?这样少了去处理ETL的步骤 > > 这快有考虑吗?

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-20 Thread Pierre Oberholzer
Hi Wei, Thanks for the hint. May I please follow up by adding more context and ask for your guidance. In case the bespoken Map[String,Any] object returned by Scala: - Has a defined schema (incl. nested) with up to 100k (!) different possible keys - Has only some portion of the keys populated

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
no no I didn't relocate any class related to jdbc Il ven 20 nov 2020, 10:02 Arvid Heise ha scritto: > I was particularly asking if you relocate classes. Since the property name > looks like a class name, it could have been changed as well. Could you > check the value of >

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
I was particularly asking if you relocate classes. Since the property name looks like a class name, it could have been changed as well. Could you check the value of PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final jar? On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier

Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-20 Thread Kostas Kloudas
Hi Hector, The main reasons for deprecating the readFileStream() was that: 1) it was only capable of parsing Strings and in a rather limited way as one could not even specify the encoding 2) it was not fault-tolerant, so your concerns about exactly-once were not covered One concern that I can

Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread Kostas Kloudas
I am also cc'ing Timo to see if he has anything more to add on this. Cheers, Kostas On Thu, Nov 19, 2020 at 9:41 PM Kostas Kloudas wrote: > > Hi, > > Thanks for reaching out! > > First of all, I would like to point out that an interesting > alternative to the per-job cluster could be running

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
the mysql connector is put in the client classpath and in the Flink lib dir. When i debugged remotely the AbandonedConnectionCleanupThread was initialized at the first run of the job by the taskmamager. Today I'll try to run the mysql connector in a standalone java app to see if the property is

Flink实时两个stream进行union,结果只有其中一个流的数据发送到了下游

2020-11-20 Thread Yang Peng
Hi,请教一个问题 我们的一个Flink实时任务中对两个流A和B进行union操作,然后和C流进行connect操作,数据流程大概是这样的:D = A.union(B) C.connect(D).keyby(C.key,D.key).process().addsink(kafkaProducer); A流和B流的数据类型是一样的,但是并发不一样,A流的并发大,B流的并发小,在某一时刻A流出现一波流量尖刺,之后发现输出到kafka的的数据中没有A流的数据都是B流的