Re: low performance in running queries

2019-10-31 Thread Zhenghua Gao
2019-10-30 15:59:52,122 INFO org.apache.flink.runtime.taskmanager.Task - Split Reader: Custom File Source -> Flat Map (1/1) (6a17c410c3e36f524bb774d2dffed4a4) switched from DEPLOYING to RUNNING. 2019-10-30 17:45:10,943 INFO org.apache.flink.runtime.taskmanager.Task

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Peter Huang
Hi Tison and Community, Thanks for bringing it up. Actually, we meet a similar bottleneck of using per cluster mode. Our team built a service for deploying and operating Flink jobs. The service sits in front of yarn clusters. To submit different job jars, we need to download client jar into the

Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Huyen Levan
Hi Yun, My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists files, forwards to B as FileInputSlits. B parses those files and shuffles the data records to C as keyed streams. C is the slowest in the graph, A is the fastest. I relied on the slf4j/logback logs to derive that

Re: How to write type information for a Java Set and List inside a Tuple?

2019-10-31 Thread Jingsong Lee
Hi Komal: I think snippet 1 is better, because it carry more information like ListTypeInfo. Consider snippet 2, now our type inference in TypeInformation.of can not infer the nested information. (It not get the information: List) On Fri, Nov 1, 2019 at 11:49 AM Jingsong Li wrote: > Hi Komal: >

Re: Re: 怎样把 state 定时写到外部存储

2019-10-31 Thread misaki L
使用 window 聚合一下批量写呢? wangl...@geekplus.com.cn 于2019年11月1日周五 上午10:17写道: > Hi Congxian, > > 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。 > 我们的 case 是写到 MySQL 中 > > > > wangl...@geekplus.com.cn > > Sender: Congxian Qiu > Send Time: 2019-11-01 10:10 > Receiver: user-zh > Subject: Re: 怎样把 state 定时写到外部存储 >

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread bupt_ljy
Hi all, Firstly thanks @tison for bring this up and strongly +1 for the overall design. I’d like to add one more example of "multiple jobs in one program" with what I’m currently working on. I’m trying to run a TPC-DS benchmark testing (including tens of sql query job) on Flink and

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Rong Rong
Hi All, Thanks @Tison for starting the discussion and I think we have very similar scenario with Theo's use cases. In our case we also generates the job graph using a client service (which serves multiple job graph generation from multiple user code) and we've found that managing the

Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Yun Gao
Hi Averell, If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there

Re: Re: 怎样把 state 定时写到外部存储

2019-10-31 Thread wangl...@geekplus.com.cn
Hi Congxian, 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。 我们的 case 是写到 MySQL 中 wangl...@geekplus.com.cn Sender: Congxian Qiu Send Time: 2019-11-01 10:10 Receiver: user-zh Subject: Re: 怎样把 state 定时写到外部存储 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢? Best, Congxian Jun Zhang

Re: 怎样把 state 定时写到外部存储

2019-10-31 Thread Congxian Qiu
好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢? Best, Congxian Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道: > 是否可以注册一个定时器? > > > 你看看这个文章,是否对你有帮助 > > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA > 在2019年10月31日 10:16,wangl...@geekplus.com.cn 写道: > > >

Re: Re: Flink State 过期清除 TTL 问题

2019-10-31 Thread wangl...@geekplus.com.cn
谢谢,了解了。 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full

Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-10-31 Thread spoganshev
The problem happens in batch jobs (the ones that use ExecutionEnvironment) that use state processor api for bootstrapping initial savepoint for streaming job. We are building a single docker image for streaming and batch versions of the job. In that image we put both presto (which we use for

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Huyen, That is not my problem statement. If it is just ingesting from A to B I am sure there are enough tutorials for me to get it done. I also feel like the more I elaborate the more confusing it gets and I am not sure why. I want to join two streams and I want to see/query the results of

Re: Flink State 过期清除 TTL 问题

2019-10-31 Thread Yun Tang
Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1]

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Thanks Igal, this is more or less what I was expecting..this implies that ALL events are received on the UI side. I was concerned about the tradeoffs of this choice: when I zoom on the map I could simply ignore messages outside the boundaries (but I still spend many cpu resource in the reading of

Re: possible backwards compatibility issue between 1.8->1.9?

2019-10-31 Thread Piotr Nowojski
Hi, It sounds strange. In the second example aren’t you just setting the “name” and “uid” for the last “map” transformation? While you would like to set it for `unorderedWait` and `filter` as well? I guess you can check this out in your application logs. Can you check what are the actual

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Igal Shilman
For that particular example, the simulator [1] is responsible for simulating physical drivers and passengers that interact with their corresponding stateful functions [2]. The interaction between the simulator and the stateful functions is happening via four Kafka topics: * to-driver - messages

RE: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Chan, Regina
Yeah just chiming in this conversation as well. We heavily use multiple job graphs to get isolation around retry logic and resource allocation across the job graphs. Putting all these parallel flows into a single graph would mean sharing of TaskManagers across what was meant to be truly

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Yes, I'm interested in how to read data from a UI..which egress should I use? If we use a kafka queue, how to filter data received in the topic? Should I use a correlation id or use a new topic per user? Il Gio 31 Ott 2019, 16:08 Igal Shilman ha scritto: > Hi Flavio, > > We haven't included the

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Igal Shilman
Hi Flavio, We haven't included the UI source code just yet, we've only used it for demos and talks. The reason is that (1) we didn't put a lot of effort and time there (2) didn't check the time to go through the individual dependencies and licences. But we will add that very soon. Would having

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Theo Diefenthal
I agree with Gyula Fora, In our case, we have a client-machine in the middle between our YARN cluster and some backend services, which can not be reached directly from the cluster nodes. On application startup, we connect to some external systems, get some information crucial for the job

State restoration from checkpoint

2019-10-31 Thread Parth Sarathy
_metadata._metadata Hi, Recently we upgraded our application in which flink windowed transformation is used. The earlier version of the application used flink 1.7.2 while the new version uses

Re: low performance in running queries

2019-10-31 Thread Habib Mostafaei
I enclosed all logs from the run and for this run I used parallelism one. However, for other runs I checked and found that all parallel workers were working properly. Is there a simple way to get profiling information in Flink? Best, Habib On 10/31/2019 2:54 AM, Zhenghua Gao wrote: I think

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Hi Vino, I already checked that code but I can't find the UI part :( On Thu, Oct 31, 2019 at 12:32 PM vino yang wrote: > Hi Flavio, > > Please see this link.[1] > > Best, > Vino > > [1]: >

Re: Stateful functions presentation code (UI part)

2019-10-31 Thread vino yang
Hi Flavio, Please see this link.[1] Best, Vino [1]: https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example Flavio Pompermaier 于2019年10月31日周四 下午4:53写道: > Hi to all, > yould it be possible to provide also the source code

Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su
Hi Till Thanks for your reply. Actually I modify the code like this: I commented the filter part, and re-run the code, then it works well !! The jar passed to createRemoteEnvironment is a udf jar, which does not contain my code My flink version is 1.9.0, So I’m confused about the actual

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Flavio Pompermaier
Hi all, we're using a lot the multiple jobs in one program and this is why: when you fetch data from a huge number of sources and, for each source, you do some transformation and then you want to write into a single directory the union of all outputs (this assumes you're doing batch). When the

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Huyen Levan
Hi Kant, So your problem statement is "ingest 2 streams into a data warehouse". The main component of the solution, from my view, is that SQL server. You can have a sink function to insert records in your two streams into two different tables (A and B), or upsert into one single table C. That

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell, yes, I want to run ad-hoc SQL queries on the joined data as well as data that may join in the future. For example, let's say if you take datasets A and B in streaming mode a row in A can join with a row B in some time in future let's say but meanwhile if I query the intermediate

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Piotr Nowojski
Hi, Generally speaking it’s a good question, why do you need to do this? What information do you need from the outer join’s internal state? Can not you just process the result to obtain the same information in another way? > Yes, I am looking for this but I am not sure how to do this? Should

Re: RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Till Rohrmann
In order to run the program on a remote cluster from the IDE you need to first build the jar containing your user code. This jar needs to passed to createRemoteEnvironment() so that the Flink client knows which jar to upload. Hence, please make sure that /tmp/myudf.jar contains your user code.

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Yang Wang
Thanks for tison starting this exciting discussion. We also suffer a lot from the per job mode. I think the per-job cluster is a dedicated cluster for only one job and will not accept more other jobs. It has the advantage of one-step submission, do not need to start dispatcher first and then

Re: 如何过滤异常的timestamp?

2019-10-31 Thread 邢瑞斌
了解了,非常感谢! Yun Tang 于2019年10月31日周四 下午3:10写道: > Hi 瑞斌 > > 后续的operator在使用window操作时,所依赖的时间戳都是这个ingestion time,如果你的message里面有"event > time"语义的field,那么后续就可以拿在source端生成的ingestion time 与这个field所表征的时间进行比较。 > > > > On 10/31/19, 10:45 AM, "邢瑞斌" wrote: > > Hi 唐云, > > >

Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Gyula Fóra
Hi all, Regarding the compilation part: I think there are up and downsides to building the Flink job (running the main method) on the client side, however since this is the current way of doing it we should have a very powerful reason to change the default behaviour. While there is a possible

Re: Are Dynamic tables backed by rocksdb?

2019-10-31 Thread Fabian Hueske
Hi, Dynamic tables might not be persisted at all but only when it is necessary for the computation of a query. For example a simple "SELECT * FROM t WHERE a = 1" query on an append only table t does not require to persist t. However, there are a bunch of operations that require to store some

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant, Not sure about what you meant in "query it using SQL"? Do you mean running ad-hoc SQL queries on that joined data? If that's what you meant, then you'll need some SQL server first, then write the joined data to that SQL server. ElasticSearch and Cassandra are ready-to-use options.

Stateful functions presentation code (UI part)

2019-10-31 Thread Flavio Pompermaier
Hi to all, yould it be possible to provide also the source code of the UI part of the ride sharing example? It would be interesting to me how the UI is reading the data from the Kafka egress. Best, Flavio

Are Dynamic tables backed by rocksdb?

2019-10-31 Thread kant kodali
Hi All, Are Dynamic tables backed by Rocksdb or in memory? if they are backed by RocksDB can I use SQL to query the state? Thanks!

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread kant kodali
Hi Averell, I want to write intermediate results (A join B) incrementally and in real-time to some external storage so I can query it using SQL. I am new to Flink so I am trying to find out if 1) such mechanism exists? 2) If not, what are the alternatives? Thanks On Thu, Oct 31, 2019 at 1:42 AM

Re: possible backwards compatibility issue between 1.8->1.9?

2019-10-31 Thread Piotr Nowojski
Hi, (This question is more appropriate for the user mailing list, not dev - when responding to my e-mail please remove dev mailing list from the recipients, I’ve kept it just FYI that discussion has moved to user mailing list). Could it be, that the problem is caused by changes in chaining

Re: How to stream intermediate data that is stored in external storage?

2019-10-31 Thread Averell
Hi Kant, I wonder why you need to "source" your intermediate state from files? Why not "source" it from the previous operator? I.e. instead of (A join B) -> State -> files -> (C), why not do (A join B) -> State -> (files + C)? -- Sent from:

Async operator with a KeyedStream

2019-10-31 Thread bastien dine
Hello, I would like to know if you can use a KeyedStream with the Async operator : I want to use the async operator to insert some stuff in my database but I want to limit 1 request per element (with key=id) at a time With a regular keyBy / map, it's working, but it's too slow (i don't have

RemoteEnvironment cannot execute job from local.

2019-10-31 Thread Simon Su
Hi all I want to test to submit a job from my local IDE and I deployed a Flink cluster in my vm. Here is my code from Flink 1.9 document and add some of my parameters. public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment

Re: Sending custom statsd tags

2019-10-31 Thread vino yang
Hi Prakhar, You need to customize StatsDReporter[1] in the Flink source. If you want to flexibly get configurable tags from the configuration file[2], you can refer to the implementation of DatadogHttpReporter#open[3] (for reference only how to get the tag). Best, Vino [1]:

Re: 如何过滤异常的timestamp?

2019-10-31 Thread Yun Tang
Hi 瑞斌 后续的operator在使用window操作时,所依赖的时间戳都是这个ingestion time,如果你的message里面有"event time"语义的field,那么后续就可以拿在source端生成的ingestion time 与这个field所表征的时间进行比较。 On 10/31/19, 10:45 AM, "邢瑞斌" wrote: Hi 唐云,

Re: Flink SQL + savepoint

2019-10-31 Thread Yun Tang
Hi Fanbin If you do not change the parallelism or add and remove operators, you could still use savepoint to resume your jobs with Flink SQL. However, as far as I know, Flink SQL might not configure the uid currently and I’m pretty sure blink branch contains this part of setting uid to stream

Re: Sending custom statsd tags

2019-10-31 Thread Prakhar Mathur
Hi Chesnay, Thanks for the response, can you point me to some existing example for this? On Wed, Oct 30, 2019 at 5:30 PM Chesnay Schepler wrote: > Not possible, you'll have to extend the StatsDReporter yourself to add > arbitrary tags. > > On 30/10/2019 12:52, Prakhar Mathur wrote: > > Hi, > >