Re: Investigating flinK
Hi, you can write a custom log appender that modifies the logs before they are sent. Thanks. Diana El-Masri 于2020年11月6日周五 上午7:47写道: > Hi, > > No the logs of the sources connected to flink. > > Thanks > > Chesnay Schepler a écrit : > > > Are you referring to the log files of Flink? > > > > On 11/5/2020 7:01 PM, Diana El-Masri wrote: > >> Hi, > >> > >> I am starting my PhD at "Ecole Polytechnique of Montreal" on IoT > >> log management. I am considering using Flink for my edge layer > >> processing. Could please advise if there is a possibility to write > >> a flink plugin that intercepts and modify the logs before they are > >> sent to the user/cloud. if yes, what is the best way to achieve > >> this with Flink? > >> Thanks > >> > >> > > > >
[jira] [Created] (FLINK-18875) DESCRIBE table can return the table properties
Kaibo Zhou created FLINK-18875: -- Summary: DESCRIBE table can return the table properties Key: FLINK-18875 URL: https://issues.apache.org/jira/browse/FLINK-18875 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Kaibo Zhou I created a table A with some properties, and then I want to see the properties when using DESCRIBE `A` or DESCRIBE EXTENDED `A`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: What is the suggested way to validate SQL?
Hi, Jingsong, Thank you very much for your suggestion. I verified that use `tEnv.sqlUpdate("xxx")` and `tEnv.explain(false)` to do validation, it works. But this method needs the connector jar, which is very inconvenient to use. Hi, Danny, Many thanks for providing very useful explanations. The user case is users will register some source/sink tables, udf to catalog service first, and then they will write and modify SQL like "insert into sinkTable select * from sourceTable where a>1" on Web SQLEditor. The platform wants to tell the user whether the SQL is valid includes the detailed position if an error occurs. For the `insert target table`, the platform wants to validate the table exists, field name and field type. Best, Kaibo Danny Chan 于2019年12月30日周一 下午5:37写道: > Hi, Kaibo Zhou ~ > > There are several phrases that a SQL text get to execution graph what can > be run with Flink runtime: > > > 1. Sql Parse: parse the sql text to AST(sql node tree) > 2. Sql node(row type) validation, this includes the tables/schema inference > 3. Sql-to-rel conversion, convert the sql node to RelNode(relational > algebra) > 4. Promote the relational expression with planner(Volcano or Hep) then > converts to execution convention nodes > 5. Genegate the code and the execution graph > > For the first 3 steps, Apache Flink uses the Apache Calcite as the > implementation, that means a SQL test passed to table environment would > always have a SQL parse/validation/sql-to-rel conversion. > > For example, a code snippet like tableEnv.sqlQuery("INSERT INTO sinkTable > SELECT f1,f2 FROM sourceTable”), the query part “SELECT f1,f2 FROM > sourceTable” was validated. > > But you are right, for Flink SQL, an insert statement target table is not > validated during the validation phrase, actually we validate the “select” > clause first, extract the target table identifier and we validate the > schema of “select” clause and target table are the same when we invoke > write to sink(after step 4). > > > For most of the cases this is okey, can you share your cases ? What kind > of validation do you want for the insert target table ? > > We are planning to include the insert target table validation in the step2 > for 2 reasons: > > • The computed column validation(stored or virtual) > • The insert implicit type coercion > > But this would comes for Flink version 1.11 ~ > > > Best, > Danny Chan > 在 2019年12月27日 +0800 PM5:44,dev@flink.apache.org,写道: > > > > "INSERT INTO > > sinkTable SELECT f1,f2 FROM sourceTable" >
What is the suggested way to validate SQL?
Hi, As a platform user, I want to integrate Flink SQL with the platform. The usage scenario is:users register table/udf to catalog service, and then write SQL scripts like: "insert into xxx select from xxx" through Web SQLEditor, the platform need to validate the SQL script after each time the user changes the SQL. One problem I encountered is SQL validate depend on connector jar which lead to many problem. More details can see the issue[1] I just submitted. Another problem I found is when I use `tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable");` to do SQL validation, I found it NOT validate the sinkTable includes schema and table name. I am confused what is the suggested way to validate a FLINK SQL? Maybe Flink could provide a suggested way to let SQL be easily integrated by external platforms. [1]: https://issues.apache.org/jira/browse/FLINK-15419 Best, Kaibo
[jira] [Created] (FLINK-15419) Validate SQL syntax not need to depend on connector jar
Kaibo Zhou created FLINK-15419: -- Summary: Validate SQL syntax not need to depend on connector jar Key: FLINK-15419 URL: https://issues.apache.org/jira/browse/FLINK-15419 Project: Flink Issue Type: Improvement Components: Table SQL / API Reporter: Kaibo Zhou Fix For: 1.11.0 As a platform user, I want to integrate Flink SQL in my platform. The users will register Source/Sink Tables and Functions to catalog service through UI, and write SQL scripts on Web SQLEditor. I want to validate the SQL syntax and validate that all catalog objects exist (table, fields, UDFs). After some investigation, I decided to use the `tEnv.sqlUpdate/sqlQuery` API to do this.`SqlParser` and`FlinkSqlParserImpl` is not a good choice, as it will not read the catalog. The users have registered *Kafka* source/sink table in the catalog, so the validation logic will be: {code:java} TableEnvironment tableEnv = tEnv.registerCatalog(CATALOG_NAME, catalog); tEnv.useCatalog(CATALOG_NAME); tEnv.useDatabase(DB_NAME); tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable"); or tEnv.sqlQuery("SELECT * FROM tableName") {code} It will through exception on Flink 1.9.0 because I do not have `flink-connector-kafka_2.11-1.9.0.jar` in my classpath. {code:java} org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed.org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82) at org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:132) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335) The following factories have been considered: org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.table.planner.delegation.BlinkPlannerFactory org.apache.flink.table.planner.delegation.BlinkExecutorFactory org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64) {code} For a platform provider, the user's SQL may depend on *ANY* connector or even a custom connector. It is complicated to do dynamic loading connector jar after parser the connector type in SQL. And this requires the users must upload their custom connector jar before doing a syntax check. I hope that Flink can provide a friendly way to verify the syntax of SQL whose tables/functions are already registered in the catalog, *NOT* need to depend on the jar of the connector. This makes it easier for SQL to be integrated by external platforms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-13787) PrometheusPushGatewayReporter does not cleanup TM metrics when run on kubernetes
Kaibo Zhou created FLINK-13787: -- Summary: PrometheusPushGatewayReporter does not cleanup TM metrics when run on kubernetes Key: FLINK-13787 URL: https://issues.apache.org/jira/browse/FLINK-13787 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.8.1, 1.7.2, 1.9.0 Reporter: Kaibo Zhou I have run a flink job on kubernetes and use PrometheusPushGatewayReporter, I can see the metrics from the flink jobmanager and taskmanager from the push gateway's UI. When I cancel the job, I found the jobmanager's metrics disappear, but the taskmanager's metrics still exist, even though I have set the _deleteOnShutdown_ to true_._ The configuration is: {code:java} metrics.reporters: "prom" metrics.reporter.prom.class: "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter" metrics.reporter.prom.jobName: "WordCount" metrics.reporter.prom.host: "localhost" metrics.reporter.prom.port: "9091" metrics.reporter.prom.randomJobNameSuffix: "true" metrics.reporter.prom.filterLabelValueCharacters: "true" metrics.reporter.prom.deleteOnShutdown: "true" {code} Other people have also encountered this problem: [link|[https://stackoverflow.com/questions/54420498/flink-prometheus-push-gateway-reporter-delete-metrics-on-job-shutdown]]. And another similar issue: [FLINK-11457|https://issues.apache.org/jira/browse/FLINK-11457]. As prometheus is a very import metrics system on kubernetes, if we can solve this problem, it is beneficial for users to monitor their flink jobs. -- This message was sent by Atlassian Jira (v8.3.2#803003)
Re: [DISCUSS] Best practice to run flink on kubernetes
Thanks for bringing this up. Obviously, option 2 and 3 are both useful for fink users on kubernetes. But option 3 is easy for users that not have many concepts of kubernetes, they can start flink on kubernetes quickly, I think it should have a higher priority. I have worked some time to integrate flink with our platform based on kubernetes, and have some concerns on option 3 from the platform user's perspective. First, I think users can be divided into common users and downstream platform users. For common users, kubernetes-session.sh (or yarn-session.sh) is convenient for them, just run shell scripts and get the jobmanager address. Then run ./bin/flink to submit a job. But for the platform users, the shell scripts are not friendly to be integrated. I need to use Java ProcessBuilder to run a shell script and redirect the stdout/stderr. I need to parse the stdout log to get the jobId, and need to process the exit code, and need to do some idempotence logic to avoid duplicate jobs to be submitted. The way our platform integrates with flink on k8s is: 1. Generate a job Id, and prepare jobmanager/taskmanager/service/configmap resource files. In the jobmanager and taskmanager resource file, we defined an initContainer to download user jar from http/hdfs/s3..., so the user jar is already on the jm and tm pod before they start. And StandaloneJobClusterEntryPoint can accept "--job-id" to pass pre-generated jobId and accept "--job-classname" to pass user jar entry class and other args[1]. 2. Submit resource files to k8s directly, and that is all. Not need other steps, e.g. upload/submit jar to flink, and k8s guarantee the idempotence natural, the same resources will be ignored. 3. Just use the pre-configured job id to query status, the platform knows the job id. The above steps are convenient for platform users. So my concern for option 3 is: 1. Besides to use kubernetes-session.sh to submit a job, can we retain the ability to let users submit k8s resources files directly, not forced to submit jobs from shell scripts. As you know, everything in kubernetes is a resource, submit a resource to kubernetes is more natural. 2. Retain the ability to pass job-classname to start Flink Job Cluster, so the platform users do not need a step to submit jar whether from ./bin/flink or from restful API. And for Flink Session Cluster, the platform uses can submit kubernetes resource files to start a session cluster, and then submit jar job from restful API to avoid call the shell scripts. 3. Retain the ability to pass job-id, It is not convenient and friendly to find which job id you have just submitted whether parse the submit log or query jobmanager restful API. And it is impossible to find the jobId in the session cluster scene, there will be many jobs with the same name and same submit time. I think it's better to retain these features already provided by the StandaloneJobClusterEntryPoint in option 3. This will make flink easier to be integrated with other platforms based on kubernetes. Thanks Kaibo [1]. https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java#L45 Jeff Zhang 于2019年8月10日周六 下午1:52写道: > Thanks Yang. K8s natively integration is very necessary and important for > the adoption of flink IMO. > I notice that the design doc is written in 2018, is there any changes or > update ? > > >>> Download the flink release binary and create the ~/.kube/config file > corresponding to the k8s cluster. It is all what you need. > > How can I specify which k8s cluster to run in case I have multiple k8s > clusters ? Can I do it via specifying flink cluster in flink cli ? > > Yang Wang 于2019年8月9日周五 下午9:12写道: > > > Hi all, > > > > Currently cloud native architectures has been introduced to many > companies > > in production. They use kubernetes to run deep learning, web server, etc. > > If we could deploy the per-job/session flink cluster on kubernetes to > make > > it mix-run with other workloads, the cluster resource utilization will be > > better. Also many kubernetes users are more easier to have a taste on the > > flink. > > > > By now we have three options to run flink jobs on k8s. > > > > [1]. Create jm/tm/service yaml and apply, then you will get a flink > > standalone cluster on k8s. Use flink run to submit job to the existed > flink > > cluster. Some companies may have their own deploy system to manage the > > flink cluster. > > > > [2]. Use flink-k8s-operator to manage multiple flink clusters, including > > session and perjob. It could manage the complete deployment lifecycle of > > the application. I think this option is really easy to use for the k8s > > users. They are familiar with k8s-opertor, kubectl and other tools of > k8s. > > They could debug and run the flink cluster just like other k8s > > applications. > > > > [3]. Natively integration with k8s, use the flink run or > > kubernetes-session.sh
Re: [DISCUSS] FLIP-35: Support Chinese Documents & Website and Improve Translation Specification
+1 for the FLIP! More and more Flink users from China have recently requested JIRA permissions. The proposed translation specification will make it easier for them to participate and improve the quality of the translation, and ensure the consistency of style. Best Kaibo Jark Wu 于2019年2月17日周日 下午10:01写道: > Hi all, > > In the past few weeks, we started a discussion thread [1] about proposing > to support Chinese documents and website. > I'm glad to receive a lot of positive and valuable feedbacks so far. I > converted the design doc into an actual FLIP > wiki to make the plan more accessible for the community. You can find it > here: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-35%3A+Support+Chinese+Documents+and+Website > > One more thing I want to highlight is that we proposed an initial > "Translation Specification and Glossary" in the FLIP: > > > https://docs.google.com/document/d/1zhGyPU4bVJ7mCIdUTsQr9lHLn4vltejhngG_SOV6g1w > > The proposed translation specification is written in Chinese (we can > convert it into other languages if needed), > it includes typesetting, style, terminology translation and some tips to > improve translation quality. > The purpose is to improve the translation quality, reduce the workload of > reviewer, keep the style consistent, and make the reader's reading > experience better. > I encourage more people to jump in to polish the specification and reach a > consensus on the translation specification and then follow the > specification as much as possible. > > I would propose to convert it into a WIKI page when it is accepted, and > link it from the contribute documentation page [2]. > Btw, the translation specification is a derivative of "Glossary of > Translation"[3] by Mozilla, licensed under CC-BY-SA 2.5 [4]. > So I'm not sure if we can include it in our project WIKI page. > > I also want to start this as a long-term thread to discuss problems and > improvements with the Translation Specification. > Because we can not cover all the glossary translation in a short time. > > Feel free to join the discussions and provide feedbacks. > > Regards, > Jark > > [1]: > > https://lists.apache.org/thread.html/1d079e7218b295048c990edcb9891b5935d02eeac0927c89696aad42@%3Cdev.flink.apache.org%3E > [2]: https://flink.apache.org/contribute-documentation.html > [3]: https://developer.mozilla.org/zh-CN/docs/Glossary_of_translation > [4]: > https://developer.mozilla.org/en-US/docs/MDN/About#Copyrights_and_licenses >
Re: [jira] [Commented] (FLINK-8353) Add support for timezones
Time zone is a very useful feature, I think there are three levels of time zone settings (priority from high to low): 1. connectors: For example, the time zone of the time field in the kafaka data 2. job level: Specifies which time zone the current job uses, perhaps specified by TableConfig or StreamQueryConfig 3. The system default, effective for all jobs, can be specified by flink_conf.yaml 2018-05-29 22:52 GMT+08:00 Weike Dong (JIRA) : > > [ https://issues.apache.org/jira/browse/FLINK-8353?page= > com.atlassian.jira.plugin.system.issuetabpanels:comment- > tabpanel=16493639#comment-16493639 ] > > Weike Dong commented on FLINK-8353: > --- > > I strongly support these features, preferably there could be a way to set > a specific timezone for a particular job, so that all the subsequent > temporal processing could be based on that. As user's input data are often > collected from other systems that do not follow the rules set by Flink > (UTC+0), currently some temporal UDFs are needed to perform such > transformations, which adds the complexity for the whole system, especially > in case of watermark generation or output of processing time into external > database, etc. > > > Add support for timezones > > - > > > > Key: FLINK-8353 > > URL: https://issues.apache.org/jira/browse/FLINK-8353 > > Project: Flink > > Issue Type: New Feature > > Components: Table API SQL > >Reporter: Timo Walther > >Priority: Major > > > > This is an umbrella issue for adding support for timezones in the Table > & SQL API. > > Usually companies work with different timezones simultaneously. We could > add support for the new time classes introduced with Java 8 and enable our > scalar functions to also work with those (or some custom time class > implementations like those from Calcite). We need a good design for this to > address most of the problems users face related to timestamp and timezones. > > It is up for discussion how to ship date, time, timestamp instances > through the cluster. > > > > -- > This message was sent by Atlassian JIRA > (v7.6.3#76005) >
[jira] [Created] (FLINK-7209) Support DataView in Java and Scala Tuples and case classes or as the accumulator of AggregateFunction itself
Kaibo Zhou created FLINK-7209: - Summary: Support DataView in Java and Scala Tuples and case classes or as the accumulator of AggregateFunction itself Key: FLINK-7209 URL: https://issues.apache.org/jira/browse/FLINK-7209 Project: Flink Issue Type: Improvement Reporter: Kaibo Zhou Support DataView in Java and Scala Tuples and case classes or as the accumulator of AggregateFunction itself. e.g. {code} public class MyAgg extends AggregateFunction<String, Tuple2<String, MapView>> {code} or {code} case class MyAcc(str: String, map: MapView) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7208) Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView
Kaibo Zhou created FLINK-7208: - Summary: Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView Key: FLINK-7208 URL: https://issues.apache.org/jira/browse/FLINK-7208 Project: Flink Issue Type: Improvement Reporter: Kaibo Zhou Refactor build-in agg(MaxWithRetractAccumulator and MinWithRetractAccumulator) using the DataView. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7207) Support getAccumulatorType when use DataView
Kaibo Zhou created FLINK-7207: - Summary: Support getAccumulatorType when use DataView Key: FLINK-7207 URL: https://issues.apache.org/jira/browse/FLINK-7207 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Kaibo Zhou Users can provide getAccumulatorType when use MapView or ListView in accumulator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7206) Implementation of DataView to support state access for UDAGG
Kaibo Zhou created FLINK-7206: - Summary: Implementation of DataView to support state access for UDAGG Key: FLINK-7206 URL: https://issues.apache.org/jira/browse/FLINK-7206 Project: Flink Issue Type: Improvement Components: Table API & SQL Environment: Implementation of MapView and ListView to support state access for UDAGG. Reporter: Kaibo Zhou Assignee: Kaibo Zhou -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6955) Add operation log for Table
Kaibo Zhou created FLINK-6955: - Summary: Add operation log for Table Key: FLINK-6955 URL: https://issues.apache.org/jira/browse/FLINK-6955 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Kaibo Zhou Assignee: Kaibo Zhou In some of the actual production scenarios, the operation of the Table is very complicated, will go through a number of steps, hoping to record the operation about Table and can print out. eg: {code} val table1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) val table2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e) val unionDs = table1.unionAll(table2.select('a, 'b, 'c)).filter('b < 2).select('c) val results = unionDs.toDataStream[Row] val result = tEnv.getLog val expected = "UnnamedTable$1 = UnnamedTable$0.select('a, 'b, 'c)\n" + "UnnamedTable$5 = UnnamedTable$2.unionAll(UnnamedTable$1)\n" + " .filter('b < 2)\n" + " .select('c)\n" assertEquals(expected, result) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6544) Expose Backend State Interface for UDAGG
Kaibo Zhou created FLINK-6544: - Summary: Expose Backend State Interface for UDAGG Key: FLINK-6544 URL: https://issues.apache.org/jira/browse/FLINK-6544 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Kaibo Zhou Currently UDAGG users can not access state, it's necessary to provide users with a convenient and efficient way to access the state within the UDAGG. This is the design doc: https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26nWscLIOn50c/edit# -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6355) TableEnvironment support register TableFunction
Kaibo Zhou created FLINK-6355: - Summary: TableEnvironment support register TableFunction Key: FLINK-6355 URL: https://issues.apache.org/jira/browse/FLINK-6355 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Kaibo Zhou Assignee: Kaibo Zhou Currently TableEnvironment only supports register ScalarFunction, it is necessary to support register TableFunction which is written in Java. The motivation is to eliminate most differences between batch and stream execution environment, and use TableEnvironment only to write programs. -- This message was sent by Atlassian JIRA (v6.3.15#6346)