Re: How do I get the value of 99th latency inside an operator?

2020-03-10 Thread Arvid Heise
Hi Felipe, could you use the JMX metrics reporter and tap into the reported values? The proposed hacks are obviously unstable over time. On Fri, Mar 6, 2020 at 1:06 PM Aljoscha Krettek wrote: > Hi, > > I'm afraid you're correct, this is currently not exposed and you would > have to hack around

Re: How to print the aggregated state everytime it is updated?

2020-03-10 Thread Arvid Heise
Hi Kant, if you only want to output every second, you probably want to use a ProcessFunction with timers [1]. Basically, this function holds the states and manages the updates to it. The updates should also be stored in a local/non-state variable *changes*. Whenever the timer triggers, you would

Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Arvid Heise
Hi Weike, according to the linked documentation, the operations are ready but as you have mentioned only for SQL batch mode, which is not surprising as they don't have a well-behaved semantics on streams. See also Calcites explanations [1]. Could you maybe outline your use case and what you'd exp

Re: How to change the flink web-ui jobServer?

2020-03-10 Thread Arvid Heise
Hi LakeShen, you can change the port with conf.setInteger(RestOptions.PORT, 8082); or if want to be on the safe side specify a range conf.setString(RestOptions.BIND_PORT, "8081-8099"); On Mon, Mar 9, 2020 at 10:47 AM LakeShen wrote: > Hi community, >now I am moving the flink job to

Re: Flink Serialization as stable (kafka) output format?

2020-03-10 Thread Arvid Heise
Hi Theo, I strongly discourage the use of flink serialization for persistent storage of data. It was never intended to work in this way and does not offer the benefits of Avro of lazy schema evolution and maturity. Unless you can explicitly measure that Avro is a bottleneck in your setup, stick w

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant, according to the documentation [1], you don't need to set a watermark assigner: > Compared to *event time*, *ingestion time* programs cannot handle any > out-of-order events or late data, but the programs don’t have to specify > how to generate *watermarks*. > > Internally, *ingestion ti

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Arvid Heise
Hi Kant, I just saw that asked the same question on SO [1]. Could you, in the future, please cross-reference these posts, so that we don't waste resources on answering? [1] https://stackoverflow.com/questions/60610985/do-i-need-to-set-assigntimestampsandwatermarks-if-i-set-my-time-characteristic-

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread kant kodali
Hi Arvid, If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data? My last question was more about this library I run several algorithms using SimpleEdgeStre

Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
I have configured source table successfully using the following configuration: - name: out_order type: source update-mode: append schema: - name: out_order_code type: STRING - name: input_date type: BIGINT - name: owner_code type: STRING connector:

RocksDB

2020-03-10 Thread Timothy Victor
Can the RocksDB state backend used by Flink be queries from outside, e.g. via SQL? Or maybe a better question, is there a RocksDB SinkFunction that exists? Thanks Tim

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, Thanks for the explanation. The group by statement will result a not append stream. I have just tried a join statement and want to send the result to kafka, it also has the error: AppendStreamTableSink requires that Table has only insert changes Why the join result is not appenda

Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi, I am running Flink in YARN mode using the official image with few additional files. I've noticed that my logger failed to initialize: root:~# docker logs flink-task-manager Starting taskexecutor as a console application on host ***. log4j:WARN No appenders could be found for logger (org.apac

Re: RocksDB

2020-03-10 Thread Aljoscha Krettek
On 10.03.20 11:36, Timothy Victor wrote: Can the RocksDB state backend used by Flink be queries from outside, e.g. via SQL? That's not possible, but you might be interested in queryable state: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html Or ma

Re: RocksDB

2020-03-10 Thread David Anderson
The State Processor API goes a bit in the direction you asking about, by making it possible to query savepoints. https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Arvid Heise
Hi Lei, yes Kafka as a sink is supported albeit only for appends (no deletions/updates yet) [1]. An example is a bit hidden in the documentation [2]: tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka versio

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread Aljoscha Krettek
On 10.03.20 10:13, kant kodali wrote: If ingestion time programs cannot handle late data then why would it generate watermarks? Isn't the whole point of watermarks is to handle the late data? Watermarks are not only used for handling late data. Watermarks are the mechanism that is used to upd

Re: Do I need to set assignTimestampsAndWatermarks if I set my time characteristic to IngestionTime?

2020-03-10 Thread David Anderson
Watermarks are a tool for handling out-of-orderness when working with event time timestamps. They provide a mechanism for managing the tradeoff between latency and completeness, allowing you to manage how long to wait for any out-of-orderness to resolve itself. Note the way that Flink uses these te

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Thanks, works now. Seems it is because i added the schema: "ROW(out_order_code STRING,input_date BIGINT, owner_code STRING, status INT)" under format label. From: Arvid Heise Date: 2020-03-10 20:51 To: wangl...@geekplus.com.cn CC: user Subject: Re: Dose flink-1.10 sql-client support kafka

Failure detection and Heartbeats

2020-03-10 Thread Morgan Geldenhuys
Hi community, I am interested in knowing more about the failure detection mechanism used by Flink, unfortunately information is a little thin on the ground and I was hoping someone could shed a little light on the topic. Looking at the documentation (https://ci.apache.org/projects/flink/flin

Re: Setting app Flink logger

2020-03-10 Thread Rafi Aroch
Hi Eyal, Sounds trivial, but can you verify that the file actually exists in /opt/flink/conf/log4j-console.properties? Also, verify that the user running the process has read permissions to that file. You said you use Flink in YARN mode, but the the example above you run inside a docker image so t

RE: Setting app Flink logger

2020-03-10 Thread Eyal Pe'er
Hi Rafi, The file exists (and is the file from the official image☺, please see below). The user is root and it has permission. I am running in HA mode using docker. cat /opt/flink/conf/log4j-console.properties # Lic

Re: History server UI not working

2020-03-10 Thread Yadong Xie
Hi pwestermann I believe this is related to https://issues.apache.org/jira/browse/FLINK-13799 It seems that the configuration.features['web-submit'] is missed from the api when you upgrading from 1.7 to 1.9.2 Do you have the same problem when upgrading to 1.10? feel free to ping me if you still

Re: Setting app Flink logger

2020-03-10 Thread miki haiat
Which image are you using ? On Tue, Mar 10, 2020, 16:27 Eyal Pe'er wrote: > Hi Rafi, > > The file exists (and is the file from the official imageJ, please see > below). > > The user is root and it has permission. I am running in HA mode using > docker. > > > > cat /opt/flink/conf/log4j-console.p

Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi All, I am using Apache Beam to construct the pipeline, and this pipeline is running with Flink Runner. Both Source and Sink are Kafka topics, I have enabled Beam Exactly once semantics. I believe how it works in beam is: the messages will be cached and not processed by the KafkaExactlyOnceSin

Automatically Clearing Temporary Directories

2020-03-10 Thread David Maddison
Hi, When a TaskManager is restarted it can leave behind unreferenced BlobServer cache directories in the temporary storage that never get cleaned up. Would it be safe to automatically clear the temporary storage every time when a TaskManager is started? (Note: the temporary volumes in use are ded

time-windowed joins and tumbling windows

2020-03-10 Thread Vinod Mehra
Hi! We are testing the following 3 way time windowed join to keep the retained state size small. Using joins for the first time here. It works in unit tests but we are not able to get expected results in production. We are still troubleshooting this issue. Can you please help us review this in cas

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Andrey Zagrebin
Hi All, Thanks a lot for the feedback! *@Yangze Guo* - Regarding the flink_docker_utils#install_flink function, I think it > should also support build from local dist and build from a > user-defined archive. I suppose you bring this up mostly for development purpose or powerful users. Most of n

Re: Is incremental checkpoints needed?

2020-03-10 Thread Arvid Heise
Hi Eleanore, incremental checkpointing would be needed if you have a large state (GB-TB), but between two checkpoints only little changes happen (KB-MB). There are two reasons for large state: large user state or large operator state coming from joins, windows, or grouping. In the end, you will s

Re: Is incremental checkpoints needed?

2020-03-10 Thread Eleanore Jin
Hi Arvid, Thank you for the clarification! Best, Eleanore On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise wrote: > Hi Eleanore, > > incremental checkpointing would be needed if you have a large state > (GB-TB), but between two checkpoints only little changes happen (KB-MB). > > There are two rea

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Thomas Weise
Thanks for working on improvements to the Flink Docker container images. This will be important as more and more users are looking to adopt Kubernetes and other deployment tooling that relies on Docker images. A generic, dynamic configuration mechanism based on environment variables is essential a

scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
Hi Experts, I have my flink application running on Kubernetes, initially with 1 Job Manager, and 2 Task Managers. Then we have the custom operator that watches for the CRD, when the CRD replicas changed, it will patch the Flink Job Manager deployment parallelism and max parallelism according to th

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore, I have a few more questions regarding your issue. - Which Flink version are you using? - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? - Keeping the job running a while after the scale-up, does the

Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Jiawei Wu
Hi Robert, Your answer really helps. About the problem, we have 2 choices. The first one is using Flink as described in this email thread. The second one is using AWS Lambda triggered by CDC stream and compute the latest 15 days record, which is a walk-around solution and looks not as elegant as

Re: Automatically Clearing Temporary Directories

2020-03-10 Thread Yang Wang
Hi David, Currently, the TaskManager could cleanup the non-referenced files in blob cache. It could configured via `blob.service.cleanup.interval`[1]. Also when the TaskManager is shut down gracefully, the storage directory will be deleted. So do you stop your TaskManager forcibly(i.e. kill -9)?

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Eleanore Jin
_Hi Xintong, Thanks for the prompt reply! To answer your question: - Which Flink version are you using? v1.8.2 - Is this skew observed only after a scaling-up? What happens if the parallelism is initially set to the scaled-up value? I also tried this, it

Re: Setting app Flink logger

2020-03-10 Thread Yang Wang
Since you are using log4j2, the java dynamic property should not be "log4j.configuration". Please use "log4j.configurationFile" instead. Maybe it is not your problem, there is something wrong with the docker image. The log4j2 properties in "flink-console.sh" are not configured correctly. Best, Y

Re: Use flink to calculate sum of the inventory under certain conditions

2020-03-10 Thread Kurt Young
Hi Jiawai, Sorry I still didn't fully get your question. What's wrong with your proposed SQL? > select vendorId, sum(inventory units) > from dynamodb > where today's time - inbound time > 15 > group by vendorId My guess is that such query would only trigger calculations by new event. So if a ver

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread Jark Wu
Hi Lei, CREATE TABLE DDL [1][2] is the recommended way to register a table since 1.9. And the yaml way might be deprecated in the future. By using DDL, a registered table can both be used as source and sink. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/s

Re: Re: Kafka sink only support append mode?

2020-03-10 Thread Jark Wu
Hi Lei, Are you trying a regular left join query? Non-time-based operators (e.g. regular join in your case) will emit result when input is not complete, the result will be updated when more inputs come in (by emitting upsert/retract messages). But time-based operators (e.g. windowed aggregate, in

Re: Question on the SQL "GROUPING SETS" and "CUBE" syntax usability

2020-03-10 Thread Jark Wu
Thanks Arvid for reminding me this topic. Actually, it is supported in streaming mode in blink planner (since Flink v1.9), but we missed to update the documentation. You can also find it is supported in the integration tests [1]. I created an issue to update docs [2]. Best, Jark [1]: https://gi

Re: scaling issue Running Flink on Kubernetes

2020-03-10 Thread Xintong Song
Hi Eleanore, That does't sound like a scaling issue. It's probably a data skew, that the data volume on some of the keys are significantly higher than others. I'm not familiar with this area though, and have copied Jark for you, who is one of the community experts in this area. Thank you~ Xinton

Re: [DISCUSS] FLIP-111: Docker image unification

2020-03-10 Thread Yangze Guo
Thanks for the reply, Andrey. Regarding building from local dist: - Yes, I bring this up mostly for development purpose. Since k8s is popular, I believe more and more developers would like to test their work on k8s cluster. I'm not sure should all developers write a custom docker file themselves i

Re: Re: Dose flink-1.10 sql-client support kafka sink?

2020-03-10 Thread wangl...@geekplus.com.cn
Hi Jark, I have tried to use CREATE table DDL First ./bin/sql-client.sh embedded. Then create a table from kafka topic and it tell me table has been created. But when I query with select * from tableName. There's error: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.a