[jira] [Created] (FLINK-27660) Table API support create function using customed jar
dalongliu created FLINK-27660: - Summary: Table API support create function using customed jar Key: FLINK-27660 URL: https://issues.apache.org/jira/browse/FLINK-27660 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27659) Planner support to use jar which is registered by "USING JAR" syntax
dalongliu created FLINK-27659: - Summary: Planner support to use jar which is registered by "USING JAR" syntax Key: FLINK-27659 URL: https://issues.apache.org/jira/browse/FLINK-27659 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27658) Introduce MutableURLClassLoader allow to register and remove user jar dynamically
dalongliu created FLINK-27658: - Summary: Introduce MutableURLClassLoader allow to register and remove user jar dynamically Key: FLINK-27658 URL: https://issues.apache.org/jira/browse/FLINK-27658 Project: Flink Issue Type: Sub-task Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric
Hi Yuan! > How about abtract the LookupCache to a higher level with a common Cache? Can you give an example of such upper - level Cache usage? It's not clear for me currently. I think it's unnecessary to have such high level abstraction, if nowhere in the code we won't operate with objects as instances of Cache. But maybe there are other opinions on this. > Does it have any metrics, such as NumCachedRecords for the AllCache? I think there won't be many problems with supporting metrics in ALL cache. Moreover, some of proposed metrics are most useful especially in ALL case, for example, 'latestLoadTimeGauge' or 'numCachedRecords', so necessary metrics definitely should be supported in this cache strategy. Best regards, Alexander вс, 15 мая 2022 г. в 20:17, zst...@163.com : > > Hi Qingsheng and devs, > > > > > Thanks for your heated discussion and redesign to optmize this feature. I > just have two comments: > > 1. How about abtract the LookupCache to a higher level with a common Cache? > It will be convenient for devs to use in other place. > > > > > 2. Does it have any metrics, such as NumCachedRecords for the AllCache? > > Best regards, > Yuan > > At 2022-05-13 20:27:44, "Qingsheng Ren" wrote: > >Hi Alexander and devs, > > > >Thank you very much for the in-depth discussion! As Jark mentioned we were > >inspired by Alexander's idea and made a refactor on our design. FLIP-221 > >[1] has been updated to reflect our design now and we are happy to hear > >more suggestions from you! > > > >Compared to the previous design: > >1. The lookup cache serves at table runtime level and is integrated as a > >component of LookupJoinRunner as discussed previously. > >2. Interfaces are renamed and re-designed to reflect the new design. > >3. We separate the all-caching case individually and introduce a new > >RescanRuntimeProvider to reuse the ability of scanning. We are planning to > >support SourceFunction / InputFormat for now considering the complexity of > >FLIP-27 Source API. > >4. A new interface LookupFunction is introduced to make the semantic of > >lookup more straightforward for developers. > > > >For replying to Alexander: > >> However I'm a little confused whether InputFormat is deprecated or not. > >Am I right that it will be so in the future, but currently it's not? > >Yes you are right. InputFormat is not deprecated for now. I think it will > >be deprecated in the future but we don't have a clear plan for that. > > > >Thanks again for the discussion on this FLIP and looking forward to > >cooperating with you after we finalize the design and interfaces! > > > >[1] > >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > >Best regards, > > > >Qingsheng > > > > > >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов > >wrote: > > > >> Hi Jark, Qingsheng and Leonard! > >> > >> Glad to see that we came to a consensus on almost all points! > >> > >> However I'm a little confused whether InputFormat is deprecated or > >> not. Am I right that it will be so in the future, but currently it's > >> not? Actually I also think that for the first version it's OK to use > >> InputFormat in ALL cache realization, because supporting rescan > >> ability seems like a very distant prospect. But for this decision we > >> need a consensus among all discussion participants. > >> > >> In general, I don't have something to argue with your statements. All > >> of them correspond my ideas. Looking ahead, it would be nice to work > >> on this FLIP cooperatively. I've already done a lot of work on lookup > >> join caching with realization very close to the one we are discussing, > >> and want to share the results of this work. Anyway looking forward for > >> the FLIP update! > >> > >> Best regards, > >> Smirnov Alexander > >> > >> чт, 12 мая 2022 г. в 17:38, Jark Wu : > >> > > >> > Hi Alex, > >> > > >> > Thanks for summarizing your points. > >> > > >> > In the past week, Qingsheng, Leonard, and I have discussed it several > >> times > >> > and we have totally refactored the design. > >> > I'm glad to say we have reached a consensus on many of your points! > >> > Qingsheng is still working on updating the design docs and maybe can be > >> > available in the next few days. > >> > I will share some conclusions from our discussions: > >> > > >> > 1) we have refactored the design towards to "cache in framework" way. > >> > > >> > 2) a "LookupCache" interface for users to customize and a default > >> > implementation with builder for users to easy-use. > >> > This can both make it possible to both have flexibility and conciseness. > >> > > >> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp > >> reducing > >> > IO. > >> > Filter pushdown should be the final state and the unified way to both > >> > support pruning ALL cache and LRU cache, > >> > so I think we should make effort in this direction. If we need to support > >> > filter pushdown for ALL cache anywa
Re: [DISCUSS] FLIP-91: Support SQL Client Gateway
Hi, Jark, Timo. Nice to have an agreement! Thanks for Jark's inputs about the multiple version Flink. I have already updated the FLIP in the rejected alternatives about details. 1. We should definitely just use LogicalTypeJsonSerializer and not a second JSON representation. Our concern is mainly that it's hard for users to use because of the flexible structure. The LogicalTypeJsonSerializer will serialize the VARCHAR to "VARCHAR()" or "{\"TYPE\": \"VARCHAR\", \"LENGTH\": 0}", which requires the end users to process the different situations. But in some cases, users just print the json to the terminal/web UI. WDYT? > Serialize the RowData Sure. I will keep your advice in mind. I think the current serialization of the RowData will not use the column name as the Object key in the json. I am not sure whether I missed something. It would be nice if you can give me an example if I do something wrong. > Have you also thought about using Flink's state types from Flink tasks/jobs? Yes. But I still think we should use a new state machine. First of all, Operation in the FLIP is much different from the Job. Operations include DDL, DML and so on. So it's not suitable to use the small concept to replace the big concept. Actually some status in the JobStatus, e.g. RESTARTING/SUSPENDED/RECONCILING don't work in the DDL Operation. On the other hand, the Gateway allows users to submit jobs(DML) in sync/async mode. The running status in the Operation Status in the different mode has different meaning: - In the async mode, when the gateway submits the job, the state comes to the FINISHED state - In the sync mode, the running status in the Operation status includes submitting the job, running job. Even if a failover occurs, we still think that this Operation is in the RUNNING state. Unless the job is unrecoverable, we change the Operation status to ERROR. Therefore, I think these two concepts are not consistent and we should not reuse the JobStatus. I add a section in the rejected alternatives. > Options to configure the REST endpoint Yes. I have modified the FLIP about this. > Naming conversion Yes. I have modified the FLIP with your suggestions. > Another smaller shortcomings in the FLIP >> SQLGatewayService.getFunction / UserDefinedFunctionInfo After reviewing the java.sql.DatabaseMetaData#getFunctions's java doc, I find it will return the system and user functions available in the Catalog. I think you are right. Therefore, we'd better to rename to the listFunctions(SessionHandle sessionHandle, OperationHandle operationHandle, String catalog, String database, ShowFunctionsOperation.FunctionScope) and it returns FunctionInfo. >> SQLGatewayService.getGatewayInfo()/getSessionConfig The result of the SQLGatewayService.getGatewayInfo and getSessionConfig is not used by the endpoint. The endpoint just serializes the result and presents it to the users. If we use the ReadableConfig, it's hard for us to iterate all the key value pairs. > configure_session VS initialize_session >> If calling it initialize_session, should we limit it only being called once? If we limit it only being called once, it allows the input of the initialize_session script. But the current design in the Gateway is aligned with the TableEnvironment#executeSql. That is, the input of the statement is a single statement rather than the script. Considering the API in the FLIP is not as same as the initialization in the CLI, I think we can use the configure_session? What do you think, Timo? Best, Shengkai Timo Walther 于2022年5月16日周一 14:28写道: > Hi Shengkai, Hi Jark, > > thanks for the additional explanation and the update of the FLIP. This > will help us in the future for documenting our decisions. The arguments > why to include the Gateway into the main repo make a lot of sense to me. > Esp. also because both CLI and gateway need some parsing functionality > that is dependent on the current state of the SQL syntax. > > Here is my last set of feedback, other than that +1 for the proposal: > > Serialize the LogicalType > > The FLIP mentions LogicalTypeJsonSerializer but the shown JSON is > different from the current master. We are using the serializable > representation of LogicalType as much as possible nowadays. We should > definitely just use LogicalTypeJsonSerializer and not a second JSON > representation. > > 1) Serialize the RowData > > Side note for serializing ROWs: we should not use field names in JSON > object keys. As e.g. `null` and other names with special characters > cause issues in JSON. > > 2) We propose the state machine like HiveServer2 > > Have you also thought about using Flink's state types from Flink > tasks/jobs? If we were using Flink types directly, it would be easier to > monitor the execution of a INSERT INTO job via the gateway without > having to map state types. Monitoring jobs is the most important > functionality and should be in sync with regular Flink job monitoring. A > HiveServer2 endpoint can still p
[jira] [Created] (FLINK-27657) Implement remote operator state backend in PyFlink
Juntao Hu created FLINK-27657: - Summary: Implement remote operator state backend in PyFlink Key: FLINK-27657 URL: https://issues.apache.org/jira/browse/FLINK-27657 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.15.0 Reporter: Juntao Hu Fix For: 1.16.0 This is for supporting broadcast state, exisintg map state implementation and caching handler can be reused. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-231: Introduce SupportStatisticReport to support reporting statistics from source connectors
Hi Godfrey, Thanks for your reply. Sounds good to me. > I think we should also introduce a config option We can add this option to the FLIP. I prefer a option for FileSystemConnector, maybe a enum. Best, Jingsong On Tue, May 17, 2022 at 10:31 AM godfrey he wrote: > Hi Jingsong, > > Thanks for the feedback. > > > >One concern I have is that we read the footer for each file, and this may > >be a bit costly in some cases. Is it possible for us to have some > > hierarchical way > yes, if there are thousands of orc/parquet files, it may take a long time. > So we can introduce a config option to let the user choose the > granularity of the statistics. > But the SIZE will not be introduced, because the planner does not use > the file size statistics now. > We can introduce once file size statistics is introduce in the future. > I think we should also introduce a config option to enable/disable > SupportStatisticReport, > because it's a heavy operation for some connectors in some cases. > > > is the filter pushdown already happening at > > this time? > That's a good point. Currently, the filter push down is after partition > pruning > to prevent the filter push down rule from consuming the partition > predicates. > The statistics will be set to unknown if filter is pushed down now. > To combine them all, we can create an optimization program after filter > push > down program to collect the statistics. This could avoid collecting > statistics multiple times. > > > Best, > Godfrey > > Jingsong Li 于2022年5月13日周五 22:44写道: > > > > Thank Godfrey for driving. > > > > Looks very good~ This will undoubtedly greatly enhance the various batch > > mode connectors. > > > > I left some comments: > > > > ## FileBasedStatisticsReportableDecodingFormat > > > > One concern I have is that we read the footer for each file, and this may > > be a bit costly in some cases. Is it possible for us to have some > > hierarchical way, e.g. > > - No statistics are collected for files by default. > > - SIZE: Generate statistics based on file Size, get the size of the file > > only with access to the master of the FileSystem. > > - DETAILED: Get the complete statistics by format, possibly by accessing > > the footer of the file. > > > > ## When use the statistics reported by connector > > > > > When partitions are pruned by PushPartitionIntoTableSourceScanRule, the > > statistics should also be updated. > > > > I understand that we definitely need to use reporter after the partition > > prune, but another question: is the filter pushdown already happening at > > this time? > > Can we make sure that in the following three cases, both the filter > > pushdown and the partition prune happen before the stats reporting. > > - only partition prune happens > > - only filter pushdown happens > > - both filter pushdown and partition prune happen > > > > Best, > > Jingsong > > > > On Fri, May 13, 2022 at 6:57 PM godfrey he wrote: > > > > > Hi all, > > > > > > I would like to open a discussion on FLIP-231: Introduce > > > SupportStatisticReport > > > to support reporting statistics from source connectors. > > > > > > Statistics are one of the most important inputs to the optimizer. > > > Accurate and complete statistics allows the optimizer to be more > powerful. > > > Currently, the statistics of Flink SQL come from Catalog only, > > > while many Connectors have the ability to provide statistics, e.g. > > > FileSystem. > > > In production, we find many tables in Catalog do not have any > statistics. > > > As a result, the optimizer can't generate better execution plans, > > > especially for Batch jobs. > > > > > > There are two approaches to enhance statistics for the planner, > > > one is to introduce the "ANALYZE TABLE" syntax which will write > > > the analyzed result to the catalog, another is to introduce a new > > > connector interface > > > which allows the connector itself to report statistics directly to the > > > planner. > > > The second one is a supplement to the catalog statistics. > > > > > > Here, we will discuss the second approach. Compared to the first one, > > > the second one is to get statistics in real time, no need to run an > > > analysis job for each table. This could help improve the user > > > experience. > > > (We will also introduce the "ANALYZE TABLE" syntax in other FLIP.) > > > > > > You can find more details in FLIP-231 document[1]. Looking forward to > > > your feedback. > > > > > > [1] > > > > https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=211883860&draftShareId=eda17eaa-43f9-4dc1-9a7d-3a9b5a4bae00&; > > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-231 > > > > > > > > > Best, > > > Godfrey > > > >
Re: [DISCUSS] FLIP-231: Introduce SupportStatisticReport to support reporting statistics from source connectors
Hi Jingsong, Thanks for the feedback. >One concern I have is that we read the footer for each file, and this may >be a bit costly in some cases. Is it possible for us to have some > hierarchical way yes, if there are thousands of orc/parquet files, it may take a long time. So we can introduce a config option to let the user choose the granularity of the statistics. But the SIZE will not be introduced, because the planner does not use the file size statistics now. We can introduce once file size statistics is introduce in the future. I think we should also introduce a config option to enable/disable SupportStatisticReport, because it's a heavy operation for some connectors in some cases. > is the filter pushdown already happening at > this time? That's a good point. Currently, the filter push down is after partition pruning to prevent the filter push down rule from consuming the partition predicates. The statistics will be set to unknown if filter is pushed down now. To combine them all, we can create an optimization program after filter push down program to collect the statistics. This could avoid collecting statistics multiple times. Best, Godfrey Jingsong Li 于2022年5月13日周五 22:44写道: > > Thank Godfrey for driving. > > Looks very good~ This will undoubtedly greatly enhance the various batch > mode connectors. > > I left some comments: > > ## FileBasedStatisticsReportableDecodingFormat > > One concern I have is that we read the footer for each file, and this may > be a bit costly in some cases. Is it possible for us to have some > hierarchical way, e.g. > - No statistics are collected for files by default. > - SIZE: Generate statistics based on file Size, get the size of the file > only with access to the master of the FileSystem. > - DETAILED: Get the complete statistics by format, possibly by accessing > the footer of the file. > > ## When use the statistics reported by connector > > > When partitions are pruned by PushPartitionIntoTableSourceScanRule, the > statistics should also be updated. > > I understand that we definitely need to use reporter after the partition > prune, but another question: is the filter pushdown already happening at > this time? > Can we make sure that in the following three cases, both the filter > pushdown and the partition prune happen before the stats reporting. > - only partition prune happens > - only filter pushdown happens > - both filter pushdown and partition prune happen > > Best, > Jingsong > > On Fri, May 13, 2022 at 6:57 PM godfrey he wrote: > > > Hi all, > > > > I would like to open a discussion on FLIP-231: Introduce > > SupportStatisticReport > > to support reporting statistics from source connectors. > > > > Statistics are one of the most important inputs to the optimizer. > > Accurate and complete statistics allows the optimizer to be more powerful. > > Currently, the statistics of Flink SQL come from Catalog only, > > while many Connectors have the ability to provide statistics, e.g. > > FileSystem. > > In production, we find many tables in Catalog do not have any statistics. > > As a result, the optimizer can't generate better execution plans, > > especially for Batch jobs. > > > > There are two approaches to enhance statistics for the planner, > > one is to introduce the "ANALYZE TABLE" syntax which will write > > the analyzed result to the catalog, another is to introduce a new > > connector interface > > which allows the connector itself to report statistics directly to the > > planner. > > The second one is a supplement to the catalog statistics. > > > > Here, we will discuss the second approach. Compared to the first one, > > the second one is to get statistics in real time, no need to run an > > analysis job for each table. This could help improve the user > > experience. > > (We will also introduce the "ANALYZE TABLE" syntax in other FLIP.) > > > > You can find more details in FLIP-231 document[1]. Looking forward to > > your feedback. > > > > [1] > > https://cwiki.apache.org/confluence/pages/resumedraft.action?draftId=211883860&draftShareId=eda17eaa-43f9-4dc1-9a7d-3a9b5a4bae00&; > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-231 > > > > > > Best, > > Godfrey > >
[jira] [Created] (FLINK-27656) Add parquet file format
Zheng Hu created FLINK-27656: Summary: Add parquet file format Key: FLINK-27656 URL: https://issues.apache.org/jira/browse/FLINK-27656 Project: Flink Issue Type: Sub-task Reporter: Zheng Hu The flink table store does not support parquet file format now. Will try to publish a PR to include parquet file format in the flink table store. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27655) Implement Avro File statistic collector
Zheng Hu created FLINK-27655: Summary: Implement Avro File statistic collector Key: FLINK-27655 URL: https://issues.apache.org/jira/browse/FLINK-27655 Project: Flink Issue Type: Sub-task Reporter: Zheng Hu Currently, the flink table store's avro file writer don't provide its File statistic collector. So we have to use the generic FieldStatsCollector. In fact, the correct direction is: Making all format writer has their own FileStatsCollector, so that we can just parse the columnar statistic from the file tailer, instead of comparing each column max-min when writing the records into the columnar file. In this way, I think we can just remove the FileFormatImpl class and FieldStatsCollector class. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job
Hi, everyone. Thanks for your feedback for FLIP-229: Introduces Join Hint for Flink SQL Batch Job[1] on the discussion thread[2]. I'd like to start a vote for it. The vote will be open for at least 72 hours unless there is an objection or not enough votes. -- Best! Xuyang [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job [2] https://lists.apache.org/thread/y668bxyjz66ggtjypfz9t571m0tyvv9h
Re: [DISCUSS] Next Flink Kubernetes Operator release timeline
Thanks Gyula. It looks good to me. I could do a favor during the release also. Please feel free to ping me to help the doc, release and test work :) Best, Aitozi Yang Wang 于2022年5月16日周一 21:57写道: > Thanks Gyula for sharing the progress. It is very likely we could have the > first release candidate next Monday. > > Best, > Yang > > Gyula Fóra 于2022年5月16日周一 20:50写道: > > > Hi Devs! > > > > We are on track for our planned 1.0.0 release timeline. There are no > > outstanding blocker issues on JIRA for the release. > > > > There are 3 outstanding new feature PRs. They are all in pretty good > shape > > and should be merged within a day: > > https://github.com/apache/flink-kubernetes-operator/pull/213 > > https://github.com/apache/flink-kubernetes-operator/pull/216 > > https://github.com/apache/flink-kubernetes-operator/pull/217 > > > > As we agreed previously we should not merge any more new features for > > 1.0.0 and focus our efforts on testing, bug fixes and documentation for > > this week. > > > > I will cut the release branch tomorrow once these PRs are merged. And the > > target day for the first release candidate is next Monday. > > > > The release managers for this release will be Yang Wang and myself. > > > > Cheers, > > Gyula > > > > On Wed, Apr 27, 2022 at 11:28 AM Yang Wang > wrote: > > > >> Thanks @Chesnay Schepler for pointing out this. > >> > >> The only public interface the flink-kubernetes-operator provides is the > >> CRD[1]. We are trying to stabilize the CRD from v1beta1. > >> If more fields are introduced to support new features(e.g. standalone > >> mode, > >> SQL jobs), they should have the default value to ensure compatibility. > >> Currently, we do not have some tools to enforce the compatibility > >> guarantees. But we have created a ticket[1] to follow this and hope it > >> could be resolved before releasing 1.0.0. > >> > >> Just as you said, now is also a good time to think more about the > approach > >> of releases. Since flink-kubernetes-operator is much simpler than Flink, > >> we > >> could have a shorter release cycle. > >> Two month for a major release(1.0, 1.1, etc.) is reasonable to me. And > >> this > >> could be shorten for the minor releases. Also we need to support at > least > >> the last two major versions. > >> > >> Maybe the standalone mode support is a big enough feature for version > 2.0. > >> > >> > >> [1]. > >> > >> > https://github.com/apache/flink-kubernetes-operator/tree/main/helm/flink-kubernetes-operator/crds > >> [2]. https://issues.apache.org/jira/browse/FLINK-26955 > >> > >> > >> @Hao t Chang We do not have regular sync up > meeting > >> so > >> far. But I think we could schedule some sync up for the 1.0.0 release if > >> necessary. Anyone who is interested are welcome. > >> > >> > >> Best, > >> Yang > >> > >> > >> > >> > >> Hao t Chang 于2022年4月27日周三 07:45写道: > >> > >> > Hi Gyula, > >> > > >> > Thanks for the release timeline information. I would like to learn the > >> > gathered knowledge and volunteer as well. Will there be sync up > >> > meeting/call for this collaboration ? > >> > > >> > From: Gyula Fóra > >> > Date: Monday, April 25, 2022 at 11:22 AM > >> > To: dev > >> > Subject: [DISCUSS] Next Flink Kubernetes Operator release timeline > >> > Hi Devs! > >> > > >> > The community has been working hard on cleaning up the operator logic > >> and > >> > adding some core features that have been missing from the preview > >> release > >> > (session jobs for example). We have also added some significant > >> > improvements around deployment/operations. > >> > > >> > With the current pace of the development I think in a few weeks we > >> should > >> > be in a good position to release next version of the operator. This > >> would > >> > also give us the opportunity to add support for the upcoming 1.15 > >> release > >> > :) > >> > > >> > We have to decide on 2 main things: > >> > 1. Target release date > >> > 2. Release version > >> > > >> > With the current state of the project I am confident that we could > cut a > >> > really good release candidate towards the end of May. I would suggest > a > >> > feature *freeze mid May (May 16)*, with a target *RC0 date of May 23*. > >> If > >> > on May 16 we feel that we are ready we could also prepare the release > >> > candidate earlier. > >> > > >> > As for the release version, I personally feel that this is a good time > >> > for *version > >> > 1.0.0*. > >> > While 1.0.0 signals a certain confidence in the stability of the > current > >> > API (compared to the preview release) I would keep the kubernetes > >> resource > >> > version v1beta1. > >> > > >> > It would also be great if someone could volunteer to join me to help > >> manage > >> > the release process this time so I can share the knowledge gathered > >> during > >> > the preview release :) > >> > > >> > Let me know what you think! > >> > > >> > Cheers, > >> > Gyula > >> > > >> > > >
[jira] [Created] (FLINK-27654) Older jackson-databind found in /flink-kubernetes-shaded-1.0-SNAPSHOT.jar
James Busche created FLINK-27654: Summary: Older jackson-databind found in /flink-kubernetes-shaded-1.0-SNAPSHOT.jar Key: FLINK-27654 URL: https://issues.apache.org/jira/browse/FLINK-27654 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-0.1.0 Reporter: James Busche A twistlock security scan of the latest kubernetes flink operator is showing an older version of jackson-databind in the /flink-kubernetes-shaded-1.0-SNAPSHOT.jar file. I don't know how to control/update the contents of this snapshot file. I see this in the report (Otherwise, everything else looks good!): == severity: High cvss: 7.5 riskFactors: Attack complexity: low,Attack vector: network,DoS,Has fix,High severity cve: CVE-2020-36518 Link: [https://nvd.nist.gov/vuln/detail/CVE-2020-36518] packageName: com.fasterxml.jackson.core_jackson-databind packagePath: /flink-kubernetes-operator-1.0-SNAPSHOT-shaded.jar description: jackson-databind before 2.13.0 allows a Java StackOverflow exception and denial of service via a large depth of nested objects. = I'd be glad to try to fix it, I'm just not sure how the jackson-databind versions are controlled in this /flink-kubernetes-operator-1.0-SNAPSHOT-shaded.jar -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27653) Pulsar Connector bug: The startCursor has been setted default value of "MessageId.earliest", Every time to restart the job,the Consumer will do the seek operation.
wawa created FLINK-27653: Summary: Pulsar Connector bug: The startCursor has been setted default value of "MessageId.earliest", Every time to restart the job,the Consumer will do the seek operation. Key: FLINK-27653 URL: https://issues.apache.org/jira/browse/FLINK-27653 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.14.3 Reporter: wawa Pulsar Connector bug: The startCursor has been setted default value of 'MessageId.earliest', Every time to restart the job,the Consumer will do the seek operation. Of course,we can set like this : '.setStartCursor(StartCursor.latest())', then, when the job restarted, it will do this seek operation : consumer.seek(MessageId.latest). As a result,some messages will be lost. What we really want is , the consumer can subscribes from where it stopped. In general, subscribes from 'earliest' or 'latest', we can use the below operation instead of seek: [ConsumerBuilder|https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html]<[T|https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/ConsumerBuilder.html]> subscriptionInitialPosition([SubscriptionInitialPosition |https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/SubscriptionInitialPosition.html]subscriptionInitialPosition) -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Hi Konstantin, Maybe change it to the following: 1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id} Merge is not allowed. If the {id} already exists, return error. Otherwise, create a new item. 2. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge Merge is allowed. If the {id} already exists, merge. Otherwise, create a new item. WDYT? Best, Lijie Konstantin Knauf 于2022年5月16日周一 20:07写道: > Hi Lijie, > > hm, maybe the following is more appropriate in that case > > POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge > > Best, > > Konstantin > > Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang < > wangdachui9...@gmail.com>: > > > Hi Konstantin, > > thanks for your feedback. > > > > From what I understand, PUT should be idempotent. However, we have a > > *timeout* field in the request. This means that initiating the same > request > > at two different times will lead to different resource status (timestamps > > of the items to be removed will be different). > > > > Should we use PUT in this case? WDYT? > > > > Best, > > Lijie > > > > Konstantin Knauf 于2022年5月13日周五 17:20写道: > > > > > Hi Lijie, > > > > > > wouldn't the REST API-idiomatic way for an update/replace be a PUT on > the > > > resource? > > > > > > PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id} > > > > > > Best, > > > > > > Konstantin > > > > > > > > > > > > Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang < > > > wangdachui9...@gmail.com>: > > > > > > > Hi everyone, > > > > > > > > I've had an offline discussion with Becket Qin and Zhu Zhu, and made > > the > > > > following changes on REST API: > > > > 1. To avoid ambiguity, *timeout* and *endTimestamp* can only choose > > one. > > > If > > > > both are specified, will return error. > > > > 2. If the specified item is already there, the *ADD* operation has > two > > > > behaviors: *return error*(default value) or *merge/update*, and we > > add a > > > > flag to the request body to control it. You can find more details > > "Public > > > > Interface" section. > > > > > > > > If there is no more feedback, we will start the vote thread next > week. > > > > > > > > Best, > > > > Lijie > > > > > > > > Lijie Wang 于2022年5月10日周二 17:14写道: > > > > > > > > > Hi Becket Qin, > > > > > > > > > > Thanks for your suggestions. I have moved the description of > > > > > configurations, metrics and REST API into "Public Interface" > section, > > > and > > > > > made a few updates according to your suggestion. And in this FLIP, > > > there > > > > > no public java Interfaces or pluggables that users need to > implement > > by > > > > > themselves. > > > > > > > > > > Answers for you questions: > > > > > 1. Yes, there 2 block actions: MARK_BLOCKED and. > > > > > MARK_BLOCKED_AND_EVACUATE_TASKS (has renamed). Currently, block > items > > > can > > > > > only be added through the REST API, so these 2 action are mentioned > > in > > > > the > > > > > REST API part (The REST API part has beed moved to public interface > > > now). > > > > > 2. I agree with you. I have changed the "Cause" field to String, > and > > > > allow > > > > > users to specify it via REST API. > > > > > 3. Yes, it is useful to allow different timeouts. As mentioned > above, > > > we > > > > > will introduce 2 fields : *timeout* and *endTimestamp* into the ADD > > > REST > > > > > API to specify when to remove the blocked item. These 2 fields are > > > > > optional, if neither is specified, it means that the blocked item > is > > > > > permanent and will not be removed. If both are specified, the > minimum > > > of > > > > > *currentTimestamp+tiemout *and* endTimestamp* will be used as the > > time > > > to > > > > > remove the blocked item. To keep the configurations more minimal, > we > > > have > > > > > removed the *cluster.resource-blocklist.item.timeout* configuration > > > > > option. > > > > > 4. Yes, the block item will be overridden if the specified item > > already > > > > > exists. The ADD operation is *ADD or UPDATE*. > > > > > 5. Yes. On JM/RM side, all the blocklist information is maintained > in > > > > > JMBlocklistHandler/RMBlocklistHandler. The blocklist handler(or > > > > abstracted > > > > > to other interfaces) will be propagated to different components. > > > > > > > > > > Best, > > > > > Lijie > > > > > > > > > > Becket Qin 于2022年5月10日周二 11:26写道: > > > > > > > > > >> Hi Lijie, > > > > >> > > > > >> Thanks for updating the FLIP. It looks like the public interface > > > section > > > > >> did not fully reflect all the user sensible behavior and API. Can > > you > > > > put > > > > >> everything that users may be aware of there? That would include > the > > > REST > > > > >> API, metrics, configurations, public java Interfaces or pluggables > > > that > > > > >> users may see or implement by themselves, as well as a brief > summary > > > of > > > > >> the > > > > >> behavior of the public API. > > > > >> > > > > >> Besides that, I have a few ques
[jira] [Created] (FLINK-27652) CompactManager.Rewriter cannot handle different partition keys invoked compaction
Jane Chan created FLINK-27652: - Summary: CompactManager.Rewriter cannot handle different partition keys invoked compaction Key: FLINK-27652 URL: https://issues.apache.org/jira/browse/FLINK-27652 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.2.0 Reporter: Jane Chan Fix For: table-store-0.2.0 h3. Issue Description When enable {{commit.force-compact}} for partitioned managed table, there had a chance that the successive synchronized writes got failure. The root cause is h3. Root Cause {code:java} Caused by: java.io.IOException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: java.io.FileNotFoundException: File file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l50gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not exist or the user running Flink ('jane.cjm') has insufficient permissions to access it. at org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172) {code} However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to partition Autumn. It seems like the rewriter found the wrong partition/bucket with the wrong file. h3. How to Reproduce {code:java} /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.store.connector; import org.junit.Test; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; /** A reproducible case. */ public class ForceCompactionITCase extends FileStoreTableITCase { @Override protected List ddl() { return Collections.singletonList( "CREATE TABLE IF NOT EXISTS T1 (" + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)"); } @Test public void test() throws ExecutionException, InterruptedException { bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')"); bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')"); bEnv.executeSql( "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming')" + ",(2, 'Winter', 'The First Snowflake'), " + "(2, 'Spring', 'The First Rose in Spring'), " + "(7, 'Summer', 'Summertime Sadness')") .await(); bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')").await(); bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')").await(); bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await(); bEnv.executeSql( "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), " + "(4, 'Spring', 'Spring Water')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(6, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe')," + " (9, 'Autumn', 'Wake Me Up When September Ends')") .await(); bEnv.executeSql( "INSERT INTO T1 VALUES(666, 'Summer',
[jira] [Created] (FLINK-27651) Support CREATE FUNCTION USING JAR syntax
dalongliu created FLINK-27651: - Summary: Support CREATE FUNCTION USING JAR syntax Key: FLINK-27651 URL: https://issues.apache.org/jira/browse/FLINK-27651 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.16.0 Reporter: dalongliu Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27650) First environment variable of top level pod template is lost
Simon Paradis created FLINK-27650: - Summary: First environment variable of top level pod template is lost Key: FLINK-27650 URL: https://issues.apache.org/jira/browse/FLINK-27650 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: 1.14.4 Reporter: Simon Paradis I am using the Flink operator image *apache/flink-kubernetes-operator:0.1.0* to deploy Flink 1.14.4 job. The deployment manifest makes use of pod template feature to inject environment variable to control structured JSON logging. I noticed the first defined environment variable is never injected into the JobManager nor TaskManager pods. The work around is to define a dummy env. var. Here's the manifest template. This gets processed by a tool that will first expand ${ENV_VAR} reference with values provided by our CI pipeline. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] Next Flink Kubernetes Operator release timeline
Thanks Gyula for sharing the progress. It is very likely we could have the first release candidate next Monday. Best, Yang Gyula Fóra 于2022年5月16日周一 20:50写道: > Hi Devs! > > We are on track for our planned 1.0.0 release timeline. There are no > outstanding blocker issues on JIRA for the release. > > There are 3 outstanding new feature PRs. They are all in pretty good shape > and should be merged within a day: > https://github.com/apache/flink-kubernetes-operator/pull/213 > https://github.com/apache/flink-kubernetes-operator/pull/216 > https://github.com/apache/flink-kubernetes-operator/pull/217 > > As we agreed previously we should not merge any more new features for > 1.0.0 and focus our efforts on testing, bug fixes and documentation for > this week. > > I will cut the release branch tomorrow once these PRs are merged. And the > target day for the first release candidate is next Monday. > > The release managers for this release will be Yang Wang and myself. > > Cheers, > Gyula > > On Wed, Apr 27, 2022 at 11:28 AM Yang Wang wrote: > >> Thanks @Chesnay Schepler for pointing out this. >> >> The only public interface the flink-kubernetes-operator provides is the >> CRD[1]. We are trying to stabilize the CRD from v1beta1. >> If more fields are introduced to support new features(e.g. standalone >> mode, >> SQL jobs), they should have the default value to ensure compatibility. >> Currently, we do not have some tools to enforce the compatibility >> guarantees. But we have created a ticket[1] to follow this and hope it >> could be resolved before releasing 1.0.0. >> >> Just as you said, now is also a good time to think more about the approach >> of releases. Since flink-kubernetes-operator is much simpler than Flink, >> we >> could have a shorter release cycle. >> Two month for a major release(1.0, 1.1, etc.) is reasonable to me. And >> this >> could be shorten for the minor releases. Also we need to support at least >> the last two major versions. >> >> Maybe the standalone mode support is a big enough feature for version 2.0. >> >> >> [1]. >> >> https://github.com/apache/flink-kubernetes-operator/tree/main/helm/flink-kubernetes-operator/crds >> [2]. https://issues.apache.org/jira/browse/FLINK-26955 >> >> >> @Hao t Chang We do not have regular sync up meeting >> so >> far. But I think we could schedule some sync up for the 1.0.0 release if >> necessary. Anyone who is interested are welcome. >> >> >> Best, >> Yang >> >> >> >> >> Hao t Chang 于2022年4月27日周三 07:45写道: >> >> > Hi Gyula, >> > >> > Thanks for the release timeline information. I would like to learn the >> > gathered knowledge and volunteer as well. Will there be sync up >> > meeting/call for this collaboration ? >> > >> > From: Gyula Fóra >> > Date: Monday, April 25, 2022 at 11:22 AM >> > To: dev >> > Subject: [DISCUSS] Next Flink Kubernetes Operator release timeline >> > Hi Devs! >> > >> > The community has been working hard on cleaning up the operator logic >> and >> > adding some core features that have been missing from the preview >> release >> > (session jobs for example). We have also added some significant >> > improvements around deployment/operations. >> > >> > With the current pace of the development I think in a few weeks we >> should >> > be in a good position to release next version of the operator. This >> would >> > also give us the opportunity to add support for the upcoming 1.15 >> release >> > :) >> > >> > We have to decide on 2 main things: >> > 1. Target release date >> > 2. Release version >> > >> > With the current state of the project I am confident that we could cut a >> > really good release candidate towards the end of May. I would suggest a >> > feature *freeze mid May (May 16)*, with a target *RC0 date of May 23*. >> If >> > on May 16 we feel that we are ready we could also prepare the release >> > candidate earlier. >> > >> > As for the release version, I personally feel that this is a good time >> > for *version >> > 1.0.0*. >> > While 1.0.0 signals a certain confidence in the stability of the current >> > API (compared to the preview release) I would keep the kubernetes >> resource >> > version v1beta1. >> > >> > It would also be great if someone could volunteer to join me to help >> manage >> > the release process this time so I can share the knowledge gathered >> during >> > the preview release :) >> > >> > Let me know what you think! >> > >> > Cheers, >> > Gyula >> > >> >
Re: taskexecutor .out files
Sorry, the command is parsed as reference. The real command is : " > taskmanager.out " Best, Weihua > 2022年5月16日 下午9:52,Weihua Hu 写道: > > Hi, > > Flink redirects stdout to the taskmanager.out when starting TaskManager. > If taskmanager.out is deleted, Flink cannot automatically create > taskmanager.out, which means any subsequent output to stdout will be lost. > > If you want to clean up the content of taskmanager.out, you can try using: > >> taskmanager.out > > This operation does not modify the Inode of the file and ensures that the > Taskmanager can continue to redirects stdout to taskmanager.out > > Best, > Weihua > >> 2022年5月15日 下午7:04,Zain Haider Nemati 写道: >> >> Hi, >> I have been running a streaming job which prints data to .out files the size >> of the file has gotten really large and is choking the root memory for my >> VM. Is it ok to delete the .out files? Would that affect any other operation >> or functionality? >
Re: taskexecutor .out files
Hi, Flink redirects stdout to the taskmanager.out when starting TaskManager. If taskmanager.out is deleted, Flink cannot automatically create taskmanager.out, which means any subsequent output to stdout will be lost. If you want to clean up the content of taskmanager.out, you can try using: > taskmanager.out This operation does not modify the Inode of the file and ensures that the Taskmanager can continue to redirects stdout to taskmanager.out Best, Weihua > 2022年5月15日 下午7:04,Zain Haider Nemati 写道: > > Hi, > I have been running a streaming job which prints data to .out files the size > of the file has gotten really large and is choking the root memory for my VM. > Is it ok to delete the .out files? Would that affect any other operation or > functionality?
[jira] [Created] (FLINK-27649) Reduce the number of outputted log lines by Elasticsearch6SinkE2ECase and Elasticsearch7SinkE2ECase
Martijn Visser created FLINK-27649: -- Summary: Reduce the number of outputted log lines by Elasticsearch6SinkE2ECase and Elasticsearch7SinkE2ECase Key: FLINK-27649 URL: https://issues.apache.org/jira/browse/FLINK-27649 Project: Flink Issue Type: Technical Debt Components: Connectors / ElasticSearch Reporter: Martijn Visser Assignee: Alexander Preuss The current ElasticSearch tests create a large number of log lines, see https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35694&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=160c9ae5-96fd-516e-1c91-deb81f59292a&l=14702 as an example. We should disable the logging by default. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27648) Review example YAMLs in the documentation
Gyula Fora created FLINK-27648: -- Summary: Review example YAMLs in the documentation Key: FLINK-27648 URL: https://issues.apache.org/jira/browse/FLINK-27648 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 The various documentation pages contain example yamls for FlinkDeployments that do not reflect the latest state of the project. Some of these wouldn't even run anymore, we should review and update these -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27647) Improve Metrics documentation to include newly added metrics
Gyula Fora created FLINK-27647: -- Summary: Improve Metrics documentation to include newly added metrics Key: FLINK-27647 URL: https://issues.apache.org/jira/browse/FLINK-27647 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Assignee: Matyas Orhidi Fix For: kubernetes-operator-1.0.0 We now support a few operator specific metrics out of the box, we should improve the metrics documentation to highlight these -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27646) Create Roadmap page for Flink Kubernetes operator
Gyula Fora created FLINK-27646: -- Summary: Create Roadmap page for Flink Kubernetes operator Key: FLINK-27646 URL: https://issues.apache.org/jira/browse/FLINK-27646 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 We should create a dedicated wiki page for the current roadmap of the operator and link it to the overview page in our docs. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27645) Update overview / supported features page for 1.0.0
Gyula Fora created FLINK-27645: -- Summary: Update overview / supported features page for 1.0.0 Key: FLINK-27645 URL: https://issues.apache.org/jira/browse/FLINK-27645 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 A lot of new features have been implemented and Flink 1.15 support also brings a lot of valuable additions. We should update the overview page with the supported features to reflect the new developments. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27644) Update CRD documentation with new spec/status changes
Gyula Fora created FLINK-27644: -- Summary: Update CRD documentation with new spec/status changes Key: FLINK-27644 URL: https://issues.apache.org/jira/browse/FLINK-27644 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.0.0 There are a number of new features / changes that are not reflected in the current documentation for the CRD. We should update these for the release -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27643) Document new deployment lifecycle features for the operator
Gyula Fora created FLINK-27643: -- Summary: Document new deployment lifecycle features for the operator Key: FLINK-27643 URL: https://issues.apache.org/jira/browse/FLINK-27643 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Assignee: Gyula Fora Fix For: kubernetes-operator-1.0.0 We should document the changes and new features to the core lifecycle management logic, including: * JM Deployment Recovery * Rollbacks * Any changed upgrade behavior * -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27642) Make sure that the Elasticsearch E2E tests only try a limited amount of retries in case of test failures
Martijn Visser created FLINK-27642: -- Summary: Make sure that the Elasticsearch E2E tests only try a limited amount of retries in case of test failures Key: FLINK-27642 URL: https://issues.apache.org/jira/browse/FLINK-27642 Project: Flink Issue Type: Improvement Components: Connectors / ElasticSearch Reporter: Martijn Visser The current Elasticsearch E2E tests keep retrying the test infinitely; we should limit the number of retries and else cancel the CI run. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] Next Flink Kubernetes Operator release timeline
Hi Devs! We are on track for our planned 1.0.0 release timeline. There are no outstanding blocker issues on JIRA for the release. There are 3 outstanding new feature PRs. They are all in pretty good shape and should be merged within a day: https://github.com/apache/flink-kubernetes-operator/pull/213 https://github.com/apache/flink-kubernetes-operator/pull/216 https://github.com/apache/flink-kubernetes-operator/pull/217 As we agreed previously we should not merge any more new features for 1.0.0 and focus our efforts on testing, bug fixes and documentation for this week. I will cut the release branch tomorrow once these PRs are merged. And the target day for the first release candidate is next Monday. The release managers for this release will be Yang Wang and myself. Cheers, Gyula On Wed, Apr 27, 2022 at 11:28 AM Yang Wang wrote: > Thanks @Chesnay Schepler for pointing out this. > > The only public interface the flink-kubernetes-operator provides is the > CRD[1]. We are trying to stabilize the CRD from v1beta1. > If more fields are introduced to support new features(e.g. standalone mode, > SQL jobs), they should have the default value to ensure compatibility. > Currently, we do not have some tools to enforce the compatibility > guarantees. But we have created a ticket[1] to follow this and hope it > could be resolved before releasing 1.0.0. > > Just as you said, now is also a good time to think more about the approach > of releases. Since flink-kubernetes-operator is much simpler than Flink, we > could have a shorter release cycle. > Two month for a major release(1.0, 1.1, etc.) is reasonable to me. And this > could be shorten for the minor releases. Also we need to support at least > the last two major versions. > > Maybe the standalone mode support is a big enough feature for version 2.0. > > > [1]. > > https://github.com/apache/flink-kubernetes-operator/tree/main/helm/flink-kubernetes-operator/crds > [2]. https://issues.apache.org/jira/browse/FLINK-26955 > > > @Hao t Chang We do not have regular sync up meeting > so > far. But I think we could schedule some sync up for the 1.0.0 release if > necessary. Anyone who is interested are welcome. > > > Best, > Yang > > > > > Hao t Chang 于2022年4月27日周三 07:45写道: > > > Hi Gyula, > > > > Thanks for the release timeline information. I would like to learn the > > gathered knowledge and volunteer as well. Will there be sync up > > meeting/call for this collaboration ? > > > > From: Gyula Fóra > > Date: Monday, April 25, 2022 at 11:22 AM > > To: dev > > Subject: [DISCUSS] Next Flink Kubernetes Operator release timeline > > Hi Devs! > > > > The community has been working hard on cleaning up the operator logic and > > adding some core features that have been missing from the preview release > > (session jobs for example). We have also added some significant > > improvements around deployment/operations. > > > > With the current pace of the development I think in a few weeks we should > > be in a good position to release next version of the operator. This would > > also give us the opportunity to add support for the upcoming 1.15 release > > :) > > > > We have to decide on 2 main things: > > 1. Target release date > > 2. Release version > > > > With the current state of the project I am confident that we could cut a > > really good release candidate towards the end of May. I would suggest a > > feature *freeze mid May (May 16)*, with a target *RC0 date of May 23*. If > > on May 16 we feel that we are ready we could also prepare the release > > candidate earlier. > > > > As for the release version, I personally feel that this is a good time > > for *version > > 1.0.0*. > > While 1.0.0 signals a certain confidence in the stability of the current > > API (compared to the preview release) I would keep the kubernetes > resource > > version v1beta1. > > > > It would also be great if someone could volunteer to join me to help > manage > > the release process this time so I can share the knowledge gathered > during > > the preview release :) > > > > Let me know what you think! > > > > Cheers, > > Gyula > > >
[jira] [Created] (FLINK-27641) Create view lost Time attribute in Hive Catalog
Robert Wu created FLINK-27641: - Summary: Create view lost Time attribute in Hive Catalog Key: FLINK-27641 URL: https://issues.apache.org/jira/browse/FLINK-27641 Project: Flink Issue Type: Bug Components: Table SQL / Client, Table SQL / Runtime Affects Versions: 1.14.4, 1.12.3 Reporter: Robert Wu Create table in hive catalog with the following sql state. : {code:java} CREATE TABLE user_score( username varchar, score varchar, proctime AS PROCTIME() ) with ( 'connector'='datagen', 'rows-per-second'='2', 'fields.score.length'='2', 'fields.username.length'='2' );{code} We can get the description: {code:java} DESCRIBE user_score; +--+-+---+-+---+---+ | name | type | null | key | extras | watermark | +--+-+---+-+---+---+ | username | STRING | true | | | | | score | STRING | true | | | | | proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | AS PROCTIME() | | +--+-+---+-+---+---+ {code} However,view create in hive catalog will lost Time attribute in the proctime field: {code:java} create view view_score_hive_catalog as select * from user_score;{code} {code:java} DESCRIBE view_score_hive_catalog; +--+--+---+-++---+ | name | type | null | key | extras | watermark | +--+--+---+-++---+ | username | STRING | true | | | | | score | STRING | true | | | | | proctime | TIMESTAMP_LTZ(3) | false | | | | +--+--+---+-++---+ {code} Otherwise,when we excute the same state. in default catalog, things are going to change: {code:java} Create view view_score_mem_catalog as select * from myhive.[hive_database].user_score;{code} {code:java} DESCRIBE view_score_mem_catalog; +--+-+---+-++---+ | name | type | null | key | extras | watermark | +--+-+---+-++---+ | username | STRING | true | | | | | score | STRING | true | | | | | proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | | +--+-+---+-++---+ {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
Piotr Nowojski created FLINK-27640: -- Summary: Flink not compiling, flink-connector-hive_2.12 is missing pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde Key: FLINK-27640 URL: https://issues.apache.org/jira/browse/FLINK-27640 Project: Flink Issue Type: Bug Components: Build System, Connectors / Hive Affects Versions: 1.16.0 Reporter: Piotr Nowojski When clean installing whole project after cleaning local {{.m2}} directory I encountered the following error when compiling flink-connector-hive_2.12: {noformat} [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [conjars (http://conjars.org/repo, default, releases+snapshots), apache.snapshots (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1] {noformat} I've solved this by adding {noformat} spring-repo-plugins https://repo.spring.io/ui/native/plugins-release/ {noformat} to ~/.m2/settings.xml file. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-224: Blacklist Mechanism
Hi Lijie, hm, maybe the following is more appropriate in that case POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge Best, Konstantin Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang < wangdachui9...@gmail.com>: > Hi Konstantin, > thanks for your feedback. > > From what I understand, PUT should be idempotent. However, we have a > *timeout* field in the request. This means that initiating the same request > at two different times will lead to different resource status (timestamps > of the items to be removed will be different). > > Should we use PUT in this case? WDYT? > > Best, > Lijie > > Konstantin Knauf 于2022年5月13日周五 17:20写道: > > > Hi Lijie, > > > > wouldn't the REST API-idiomatic way for an update/replace be a PUT on the > > resource? > > > > PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id} > > > > Best, > > > > Konstantin > > > > > > > > Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang < > > wangdachui9...@gmail.com>: > > > > > Hi everyone, > > > > > > I've had an offline discussion with Becket Qin and Zhu Zhu, and made > the > > > following changes on REST API: > > > 1. To avoid ambiguity, *timeout* and *endTimestamp* can only choose > one. > > If > > > both are specified, will return error. > > > 2. If the specified item is already there, the *ADD* operation has two > > > behaviors: *return error*(default value) or *merge/update*, and we > add a > > > flag to the request body to control it. You can find more details > "Public > > > Interface" section. > > > > > > If there is no more feedback, we will start the vote thread next week. > > > > > > Best, > > > Lijie > > > > > > Lijie Wang 于2022年5月10日周二 17:14写道: > > > > > > > Hi Becket Qin, > > > > > > > > Thanks for your suggestions. I have moved the description of > > > > configurations, metrics and REST API into "Public Interface" section, > > and > > > > made a few updates according to your suggestion. And in this FLIP, > > there > > > > no public java Interfaces or pluggables that users need to implement > by > > > > themselves. > > > > > > > > Answers for you questions: > > > > 1. Yes, there 2 block actions: MARK_BLOCKED and. > > > > MARK_BLOCKED_AND_EVACUATE_TASKS (has renamed). Currently, block items > > can > > > > only be added through the REST API, so these 2 action are mentioned > in > > > the > > > > REST API part (The REST API part has beed moved to public interface > > now). > > > > 2. I agree with you. I have changed the "Cause" field to String, and > > > allow > > > > users to specify it via REST API. > > > > 3. Yes, it is useful to allow different timeouts. As mentioned above, > > we > > > > will introduce 2 fields : *timeout* and *endTimestamp* into the ADD > > REST > > > > API to specify when to remove the blocked item. These 2 fields are > > > > optional, if neither is specified, it means that the blocked item is > > > > permanent and will not be removed. If both are specified, the minimum > > of > > > > *currentTimestamp+tiemout *and* endTimestamp* will be used as the > time > > to > > > > remove the blocked item. To keep the configurations more minimal, we > > have > > > > removed the *cluster.resource-blocklist.item.timeout* configuration > > > > option. > > > > 4. Yes, the block item will be overridden if the specified item > already > > > > exists. The ADD operation is *ADD or UPDATE*. > > > > 5. Yes. On JM/RM side, all the blocklist information is maintained in > > > > JMBlocklistHandler/RMBlocklistHandler. The blocklist handler(or > > > abstracted > > > > to other interfaces) will be propagated to different components. > > > > > > > > Best, > > > > Lijie > > > > > > > > Becket Qin 于2022年5月10日周二 11:26写道: > > > > > > > >> Hi Lijie, > > > >> > > > >> Thanks for updating the FLIP. It looks like the public interface > > section > > > >> did not fully reflect all the user sensible behavior and API. Can > you > > > put > > > >> everything that users may be aware of there? That would include the > > REST > > > >> API, metrics, configurations, public java Interfaces or pluggables > > that > > > >> users may see or implement by themselves, as well as a brief summary > > of > > > >> the > > > >> behavior of the public API. > > > >> > > > >> Besides that, I have a few questions: > > > >> > > > >> 1. According to the conversation in the discussion thread, it looks > > like > > > >> the BlockAction will have "MARK_BLOCKLISTED" and > > > >> "MARK_BLOCKLISTED_AND_EVACUATE_TASKS". Is that the case? If so, can > > you > > > >> add > > > >> that to the public interface as well? > > > >> > > > >> 2. At this point, the "Cause" field in the BlockingItem is a > Throwable > > > and > > > >> is not reflected in the REST API. Should that be included in the > query > > > >> response? And should we change that field to be a String so users > may > > > >> specify the cause via the REST API when they block some nodes / TMs? > > > >> > > > >> 3. Would it be useful to allow users to have different timeou
[jira] [Created] (FLINK-27639) Flink JOIN uses the now() function when inserting data, resulting in data that cannot be deleted
lvycc created FLINK-27639: - Summary: Flink JOIN uses the now() function when inserting data, resulting in data that cannot be deleted Key: FLINK-27639 URL: https://issues.apache.org/jira/browse/FLINK-27639 Project: Flink Issue Type: Bug Affects Versions: 1.14.4 Reporter: lvycc I use the now() function as the field value when I insert data using SQL ,but I can't delete the inserted data,here is my sql: {code:java} //代码占位符 CREATE TABLE t_order ( order_id INT, order_name STRING, product_id INT, user_id INT, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'ycc123', 'database-name' = 'wby_test', 'table-name' = 't_order' ); CREATE TABLE t_logistics ( logistics_id INT, logistics_target STRING, logistics_source STRING, logistics_time TIMESTAMP(0), order_id INT, PRIMARY KEY(logistics_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = 'ycc123', 'database-name' = 'wby_test', 'table-name' = 't_logistics' ); CREATE TABLE t_join_sink ( order_id INT, order_name STRING, logistics_id INT, logistics_target STRING, logistics_source STRING, logistics_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/wby_test?characterEncoding=utf8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai', 'table-name' = 't_join_sink', 'username' = 'root', 'password' = 'ycc123' ); INSERT INTO t_join_sink SELECT ord.order_id, ord.order_name, logistics.logistics_id, logistics.logistics_target, logistics.logistics_source, now() FROM t_order AS ord LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id; {code} The debug finds that SinkUpsertMaterializer causes the problem ,the result of the now() function changes when I delete the data,therefore, the delete operation is ignored But what can I do to avoid this problem? -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: What is the alternative schema of DirectOutput
Hi, Boot. For DirectedOutput, I guess you want to split stream. For splitting stream, you can use side_output[1] [1]: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/side_output/ Best regards, Yuxia - 原始邮件 - 发件人: "Boot" <331233...@qq.com.INVALID> 收件人: "dev" 发送时间: 星期一, 2022年 5 月 16日 下午 5:59:17 主题: Fw: What is the alternative schema of DirectOutput The correct class name is DirectedOutput ---Original--- From: "Boot"<331233...@qq.com> Date: Mon, May 16, 2022 17:19 PM To: "user"
Fw: What is the alternative schema of DirectOutput
The correct class name is DirectedOutput ---Original--- From: "Boot"<331233...@qq.com> Date: Mon, May 16, 2022 17:19 PM To: "user"
[jira] [Created] (FLINK-27638) failed to join with table function
Spongebob created FLINK-27638: - Summary: failed to join with table function Key: FLINK-27638 URL: https://issues.apache.org/jira/browse/FLINK-27638 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.14.3 Reporter: Spongebob Attachments: image-2022-05-16-19-08-25-477.png, image-2022-05-16-19-08-50-286.png # regiseter one table function named `GET_STREAMING_MODEL_SINK_FILTER` # create two flinksql complexible DML that both inner join with the table functioin. # schedule the two DML in one submission based on statementSet. atfer these steps I found that the table function was run on one exclusive task and it turned to be finished in serveral seconds. And the two DML had not any output after the inner join with that table function. Appendix `image-2022-05-16-19-08-25-477.png` shows the schedule graph of this situation. And appendix `image-2022-05-16-19-08-50-286.png` shows the expected graph when using table function. !image-2022-05-16-19-08-25-477.png! !image-2022-05-16-19-08-50-286.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)
[RESULT][VOTE] FLIP-226: Introduce Schema Evolution on Table Store
Hi dev, FLIP-226 [1] Has been accepted. There [2] were 3 binding votes in favor. None against. Votes are in the order of arrival: - Binding: Jing Zhang - Binding: Jark Wu - Binding: Jingsong Lee [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store [2] https://lists.apache.org/thread/lg5txz95mgko4mp6fqcwt1dd1hbjctjy Best, Jingsong
Re: [VOTE] FLIP-226: Introduce Schema Evolution on Table Store
+1 (binding) Best, Jingsong On Mon, May 16, 2022 at 2:15 PM Jark Wu wrote: > +1 (binding) > > Best, > Jark > > On Mon, 16 May 2022 at 13:50, Jing Zhang wrote: > > > +1 > > Thanks @ Jingsong for driving this topic. > > > > Best, > > Jing Zhang > > > > Jingsong Li 于2022年5月12日周四 17:06写道: > > > > > Hi, everyone > > > > > > Thanks all for your attention to FLIP-226: Introduce Schema Evolution > on > > > Table Store [1] and participation in the discussion in the mail thread > > [2]. > > > > > > I'd like to start a vote for it. The vote will be open for at least 72 > > > hours unless there is an objection or not enough votes. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store > > > [2] https://lists.apache.org/thread/sls26s8y55tfh59j2dqkgczml6km49jx > > > > > >
[jira] [Created] (FLINK-27637) Optimize the log information when the asynchronous part of checkpoint is canceled
Lijie Wang created FLINK-27637: -- Summary: Optimize the log information when the asynchronous part of checkpoint is canceled Key: FLINK-27637 URL: https://issues.apache.org/jira/browse/FLINK-27637 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Lijie Wang When the checkpoint is aborted due to expiration, the tasks whose asynchronous part of checkpoint is not completed will print following logs: {code:java} 60477 [AsyncOperations-thread-1] INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - DeclineSink (1/1)#0 - asynchronous part of checkpoint 2 could not be completed. java.util.concurrent.CancellationException: null at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_241] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_241] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:645) ~[classes/:?] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:60) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[classes/:?] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [classes/:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_241] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_241] {code} Maybe we can optimize the logs to make it more friendly. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27636) Add data type coverage and sync / async tests for catalog in connector testing framework
Qingsheng Ren created FLINK-27636: - Summary: Add data type coverage and sync / async tests for catalog in connector testing framework Key: FLINK-27636 URL: https://issues.apache.org/jira/browse/FLINK-27636 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.16.0 Reporter: Qingsheng Ren -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27635) Add data type coverage and abilities test cases for table connectors in testing framework
Qingsheng Ren created FLINK-27635: - Summary: Add data type coverage and abilities test cases for table connectors in testing framework Key: FLINK-27635 URL: https://issues.apache.org/jira/browse/FLINK-27635 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.16.0 Reporter: Qingsheng Ren -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27634) Add component failure cases in connector testing framework
Qingsheng Ren created FLINK-27634: - Summary: Add component failure cases in connector testing framework Key: FLINK-27634 URL: https://issues.apache.org/jira/browse/FLINK-27634 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.16.0 Reporter: Qingsheng Ren -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27633) Add FLIP-33 metric validation case in connector testing framework
Qingsheng Ren created FLINK-27633: - Summary: Add FLIP-33 metric validation case in connector testing framework Key: FLINK-27633 URL: https://issues.apache.org/jira/browse/FLINK-27633 Project: Flink Issue Type: Sub-task Components: Tests Affects Versions: 1.16.0 Reporter: Qingsheng Ren -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27632) Improve connector testing framework to support more cases
Qingsheng Ren created FLINK-27632: - Summary: Improve connector testing framework to support more cases Key: FLINK-27632 URL: https://issues.apache.org/jira/browse/FLINK-27632 Project: Flink Issue Type: Improvement Components: Tests Affects Versions: 1.16.0 Reporter: Qingsheng Ren In order to make connector testing framework available for more connectors, including Table /SQL connectors, more test cases are required to cover more scenarios. -- This message was sent by Atlassian Jira (v8.20.7#820007)
Re: [DISCUSS] FLIP-232: Add Retry Support For Async I/O In DataStream API
Hi Jinsong, Good question! The delayQueue is very similar to incompleteElements in UnorderedStreamElementQueue, it only records the references of in-flight retry elements, the core value is for the ease of a fast scan when force flush during endInput and less refactor for existing logic. Users needn't configure a new capacity for the delayQueue, just turn the original one up (if needed). And separately store the input data and retry state is mainly to implement backwards compatibility. The first version of Poc, I used a single combined state in order to reduce state costs, but hard to keep compatibility, and changed into two via Yun Gao's concern about the compatibility. Best, Lincoln Lee Jingsong Li 于2022年5月16日周一 14:48写道: > Thanks Lincoln for your reply. > > I'm a little confused about the relationship between Ordered/Unordered > Queue and DelayQueue. Why do we need to have a DelayQueue? > Can we remove the DelayQueue and put the state of the retry in the > StreamRecordQueueEntry (seems like it's already in the FLIP) > The advantages of doing this are: > 1. twice less data is stored in state > 2. the concept is unified, the user only needs to configure one queue > capacity > > Best, > Jingsong > > On Mon, May 16, 2022 at 12:10 PM Lincoln Lee > wrote: > > > Hi Jinsong, > > Thanks for your feedback! Let me try to answer the two questions: > > > > For q1: Motivation > > Yes, users can implement retries themselves based on the external async > > client, but this requires each user to do similar things, and if we can > > support retries uniformly, user code would become much simpler. > > > > > The real external call should happen in the asynchronous thread. > > My question is: If the user makes a retry in this asynchronous thread by > > themselves, is there a difference between this and the current FLIP's? > > > > > > For q2: Block Main Thread > > You're right, the queue data will be stored in the ListState which is an > > OperateState, though in fact, for ListState storage, the theoretical > upper > > limit is Integer.MAX_VALUE, but we can't increase the queue capacity too > > big in production because the risk of OOM increases when the queue > capacity > > grows, and increases the task parallelism maybe a more viable way when > > encounter too many retry items for a single task. > > We recommend using a proper estimate of queue capacity based on the > formula > > like this: 'inputRate * retryRate * avgRetryDuration', and also the > actual > > checkpoint duration in runtime. > > > > > If I understand correctly, the retry queue will be put into ListState, > > this > > state is OperatorState? As far as I know, OperatorState does not have the > > ability to store a lot of data. > > So after we need to retry more data, we should need to block the main > > thread? What is the maximum size of the default retry queue? > > > > > > > > Best, > > Lincoln Lee > > > > > > Jingsong Li 于2022年5月16日周一 10:31写道: > > > > > Thank Lincoln for the proposal. > > > > > > ## Motivation: > > > > > > > asyncInvoke and callback functions are executed synchronously by the > > main > > > thread, which is not suitable adding long time blocking operations, and > > > introducing additional thread will bring extra complexity for users > > > > > > According to the documentation of AsyncFunction: > > > > > > > For each #asyncInvoke, an async io operation can be triggered, and > once > > > it has been done, the result can be collected by calling {@link > > > ResultFuture#complete}. For each async operation, its context is stored > > in > > > the operator immediately after invoking #asyncInvoke, avoiding blocking > > for > > > each stream input as long as the internal buffer is not full. > > > > > > The real external call should happen in the asynchronous thread. > > > > > > My question is: If the user makes a retry in this asynchronous thread > by > > > themselves, is there a difference between this and the current FLIP's? > > > > > > ## Block Main Thread > > > > > > If I understand correctly, the retry queue will be put into ListState, > > this > > > state is OperatorState? As far as I know, OperatorState does not have > the > > > ability to store a lot of data. > > > So after we need to retry more data, we should need to block the main > > > thread? What is the maximum size of the default retry queue? > > > > > > Best, > > > Jingsong > > > > > > On Thu, May 12, 2022 at 8:56 PM Lincoln Lee > > > wrote: > > > > > > > Dear Flink developers, > > > > > > > > I would like to open a discussion on FLIP 232 [1], for an extension > of > > > > AsyncWaitOperator to support retry for user's asyncFunction. > > > > > > > > To do so, new user interface will added to define the trigger > condition > > > for > > > > retry and when should retry. Internally, a delayed retry mechanism > will > > > be > > > > introduced. > > > > > > > > There's PoC for this FLIP [2][3], thanks Yun Gao for offline > > discussions > > > > and valuable comments. > > > >