[PR] [FLINK-33211][table] support flink table lineage [flink]
HuangZhenQiu opened a new pull request, #25012: URL: https://github.com/apache/flink/pull/25012 ## What is the purpose of the change 1. Add Table Lineage Vertex into transformation in planner. The final LineageGraph is generated from transformation and put into StreamGraph. The lineage graph will be published to Lineage Listener in follow up PR. 2. Deprecated table source and sink are not considered as no enough info can be used for name and namespace for lineage dataset. ## Brief change log - add table lineage interface and default implementations - create lineage vertex and add them to transformation in the phase of physical plan to transformation conversion. ## Verifying this change 1. Add TableLineageGraphTest for both stream and batch. 2. Added LineageGraph verification in TransformationsTest for legacy sources. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35697) Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink
[ https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862672#comment-17862672 ] Muhammet Orazov commented on FLINK-35697: - Hey [~chalixar], Thanks for the update! I'll look into it, and try to finalise next days > Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink > - > > Key: FLINK-35697 > URL: https://issues.apache.org/jira/browse/FLINK-35697 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Ahmed Hamdy >Priority: Blocker > Fix For: 1.20.0 > > > h2. Description > In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with > default value of 10 minutes and default failOnTimeout to false. > We need to test the new feature on different levels > - Functional Testing > - Performance Testing > - Regression Testing > h2. Common Utils > The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need > to use an implementation sink for our tests, Any implementation where we can > track delivery of elements is accepted in our tests, an example is: > {code} > class DiscardingElementWriter extends AsyncSinkWriter { > SeparateThreadExecutor executor = > new SeparateThreadExecutor(r -> new Thread(r, > "DiscardingElementWriter")); > public DiscardingElementWriter( > Sink.InitContext context, > AsyncSinkWriterConfiguration configuration, > Collection> > bufferedRequestStates) { > super( > (element, context1) -> element.toString(), > context, > configuration, > bufferedRequestStates); > } > @Override > protected long getSizeInBytes(String requestEntry) { > return requestEntry.length(); > } > @Override > protected void submitRequestEntries( > List requestEntries, ResultHandler > resultHandler) { > executor.execute( > () -> { > long delayMillis = new Random().nextInt(5000); > try { > Thread.sleep(delayMillis); > } catch (InterruptedException ignored) { > } > for (String entry : requestEntries) { > LOG.info("Discarding {} after {} ms", entry, > delayMillis); > } > resultHandler.complete(); > }); > } > } > {code} > We will also need a simple Flink Job that writes data using the sink > {code} > final StreamExecutionEnvironment env = StreamExecutionEnvironment > .getExecutionEnvironment(); > env.setParallelism(1); > env.fromSequence(0, 100) > .map(Object::toString) > .sinkTo(new DiscardingTestAsyncSink<>()); > {code} > We can use least values for batch size and inflight requests to increase > number of requests that are subject to timeout > {code} > public class DiscardingTestAsyncSink extends AsyncSinkBase { > private static final Logger LOG = > LoggerFactory.getLogger(DiscardingTestAsyncSink.class); > public DiscardingTestAsyncSink(long requestTimeoutMS, boolean > failOnTimeout) { > super( > (element, context) -> element.toString(), > 1, // maxBatchSize > 1, // maxInflightRequests > 10, // maxBufferedRequests > 1000L, // maxBatchsize > 100, // MaxTimeInBuffer > 500L, // maxRecordSize > requestTimeoutMS, > failOnTimeout); > } > @Override > public SinkWriter createWriter(WriterInitContext context) throws > IOException { > return new DiscardingElementWriter( > new InitContextWrapper(context), > AsyncSinkWriterConfiguration.builder() > .setMaxBatchSize(this.getMaxBatchSize()) > .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes()) > .setMaxInFlightRequests(this.getMaxInFlightRequests()) > .setMaxBufferedRequests(this.getMaxBufferedRequests()) > .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS()) > > .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes()) > .setFailOnTimeout(this.getFailOnTimeout()) > .setRequestTimeoutMS(this.getRequestTimeoutMS()) > .build(), > Collections.emptyList()); >
[jira] [Created] (FLINK-35747) customer ‘rest.bind-address' config overwrite by code
dncba created FLINK-35747: - Summary: customer ‘rest.bind-address' config overwrite by code Key: FLINK-35747 URL: https://issues.apache.org/jira/browse/FLINK-35747 Project: Flink Issue Type: Bug Components: Deployment / YARN Affects Versions: 1.19.1 Reporter: dncba When I want flink on Yarn webui bind on 0.0.0.0 to listen Ipv4 & Ipv6 double stack, I found the ‘rest.bind-address' config will auto overwrite by here {code:java} package org.apache.flink.yarn.entrypoint; public class YarnEntrypointUtils { public static Configuration loadConfiguration( final Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory, dynamicParameters); final String hostname =env.get(ApplicationConstants.Environment.NM_HOST.key()); configuration.set(JobManagerOptions.ADDRESS, hostname); configuration.set(RestOptions.ADDRESS, hostname); # overwrite hostname by code configuration.set(RestOptions.BIND_ADDRESS, hostname); ` } } {code} In most case the are right. when user want config the ‘rest.bind-address' by slef , the customer config will be auto overwirte. the best way is check the user config before the ovewrite. like this {code:java} public class YarnEntrypointUtils { public static Configuration loadConfiguration( final Configuration configuration = GlobalConfiguration.loadConfiguration(workingDirectory, dynamicParameters); final String hostname =env.get(ApplicationConstants.Environment.NM_HOST.key()); configuration.set(JobManagerOptions.ADDRESS, hostname); configuration.set(RestOptions.ADDRESS, hostname); # check before the overwrite String bindAddress = configuration.getString(RestOptions.BIND_ADDRESS); if (StringUtils.isBlank(bindAddress)) { configuration.setString(RestOptions.BIND_ADDRESS, hostname); } ` } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow
[ https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862670#comment-17862670 ] Muhammet Orazov commented on FLINK-34487: - Hey [~mapohl] , Sure I will create similar PR for other branches (fingers crossed) > Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly > workflow > - > > Key: FLINK-34487 > URL: https://issues.apache.org/jira/browse/FLINK-34487 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: github-actions, pull-request-available > Fix For: 2.0.0 > > > Analogously to the [Azure Pipelines nightly > config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183] > we want to generate the wheels artifacts in the GHA nightly workflow as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]
morazow commented on PR #24426: URL: https://github.com/apache/flink/pull/24426#issuecomment-2205212666 Great, thanks @XComp, @HuangXingBo for the help 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33211) Implement table lineage graph
[ https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862668#comment-17862668 ] Matthias Pohl edited comment on FLINK-33211 at 7/3/24 6:40 AM: --- I revert [960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba] due to the [compilation failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598] in {{master}} build 20240703.5 to unblock {{master}} again. I decided against just fixing it because it appears to be a conflict resolution issue. Please go ahead and create another PR and make sure that there are not other hidden conflicts introduced with this change. Sorry for the extra work. Additionally, please be transparent about the fixed version and commit information in the Jira issues: It's common practice to update the Fix Version in the Jira issues and add the commit hash and branch as part of the resolution comment. was (Author: mapohl): I revert [960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba] due to the [compilation failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598] in {{master}} build 20240703.5. I decided against just fixing it because it appears to be a conflict resolution issue. Please go ahead and create another PR and make sure that there are not other hidden conflicts introduced with this change. Sorry for the extra work. Additionally, please be transparent about the fixed version and commit information in the Jira issues: It's common practice to update the Fix Version in the Jira issues and add the commit hash and branch as part of the resolution comment. > Implement table lineage graph > - > > Key: FLINK-33211 > URL: https://issues.apache.org/jira/browse/FLINK-33211 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Fang Yong >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Implement table lineage graph -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-33211) Implement table lineage graph
[ https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reopened FLINK-33211: --- I revert [960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba] due to the [compilation failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598] in {{master}} build 20240703.5. I decided against just fixing it because it appears to be a conflict resolution issue. Please go ahead and create another PR and make sure that there are not other hidden conflicts introduced with this change. Sorry for the extra work. Additionally, please be transparent about the fixed version and commit information in the Jira issues: It's common practice to update the Fix Version in the Jira issues and add the commit hash and branch as part of the resolution comment. > Implement table lineage graph > - > > Key: FLINK-33211 > URL: https://issues.apache.org/jira/browse/FLINK-33211 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Fang Yong >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Implement table lineage graph -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35746) Add logic to override observed config based on settings observed through REST API
Gyula Fora created FLINK-35746: -- Summary: Add logic to override observed config based on settings observed through REST API Key: FLINK-35746 URL: https://issues.apache.org/jira/browse/FLINK-35746 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.10.0 In many cases the Flink operator relies on the user configuration to infer running application/cluster settings. While this is mostly fine, many of these configs can be programmatically changed by the user in their app main method that will ultimately take precedence and can lead to inconsistent behaviour from the operators side. To alleviate this we need to add logic to override parts of the observed config based on what we see from the cluster. Such as checkpointing enabled / disabled, checkpoint intervals etc. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]
XComp commented on PR #24911: URL: https://github.com/apache/flink/pull/24911#issuecomment-2205155149 [CI with AdaptiveScheduler enabled](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=60616) was successful. I'm gonna go ahead and prepare this PR to be merged (i.e. remove the DO-NOT-MERG commit). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow
[ https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reopened FLINK-34487: --- Ok, I was wrong with my assumption that we don't need backports. The Python wheels are created for [master|https://github.com/apache/flink/actions/runs/9770777281] but not for the other nightlies (e.g. [release-1.19|https://github.com/apache/flink/actions/runs/9770777344]). [~m.orazow] Can you prepare backports for 1.20, 1.19 and 1.18? > Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly > workflow > - > > Key: FLINK-34487 > URL: https://issues.apache.org/jira/browse/FLINK-34487 > Project: Flink > Issue Type: Sub-task > Components: Build System / CI >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Assignee: Muhammet Orazov >Priority: Major > Labels: github-actions, pull-request-available > Fix For: 2.0.0 > > > Analogously to the [Azure Pipelines nightly > config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183] > we want to generate the wheels artifacts in the GHA nightly workflow as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]
leonardBang commented on code in PR #24787: URL: https://github.com/apache/flink/pull/24787#discussion_r1663518072 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala: ## @@ -455,7 +449,7 @@ class RexNodeToExpressionConverter( case TIMESTAMP_WITH_LOCAL_TIME_ZONE => val v = literal.getValueAs(classOf[TimestampString]) -toLocalDateTime(v).atZone(timeZone.toZoneId).toInstant +toLocalDateTime(v).atZone(ZoneId.of(ZoneOffset.UTC.getId)).toInstant Review Comment: Thanks @lshangq for the information, evaluation once make sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [cdc-connector][jdbc-db2]Flink cdc pipeline support db2 source [flink-cdc]
ChengJie1053 opened a new pull request, #3450: URL: https://github.com/apache/flink-cdc/pull/3450 Flink cdc pipeline support db2 source ![image](https://github.com/apache/flink-cdc/assets/125547374/69eadc27-6a90-4bf4-97e1-4a6746d14db8) ![image](https://github.com/apache/flink-cdc/assets/125547374/95ed055b-f2f4-4410-aad9-059870714c11) ![image](https://github.com/apache/flink-cdc/assets/125547374/ffbdc504-8675-40ad-8a17-2e1de4995a8f) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]
Myasuka commented on code in PR #24975: URL: https://github.com/apache/flink/pull/24975#discussion_r1663472146 ## docs/content/docs/dev/table/materialized-table/statements.md: ## @@ -0,0 +1,344 @@ +--- +title: Statements +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- + + +# Description + +Flink SQL supports the following Materialized Table statements for now: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [Alter MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS + +: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +### PRIMARY KEY + +PRIMARY KEY defines an optional list of columns that uniquely identifies each row within the table. The column as the primary key must be non-null. + +### PARTITIONED BY + +PARTITIONED BY define an optional list of columns to partition the materialized table. A directory is created for each partition if this materialized table is used as a filesystem sink. + +**Example:** + +```sql +-- Create a materialized table and specify the partition field as `ds`. +CREATE MATERIALIZED TABLE my_materialized_table +PARTITIONED BY (ds) +FRESHNESS = INTERVAL '1' HOUR +AS SELECT +ds +FROM +... +``` + +Note +- The partition column must be included in the query statement of the materialized table. + +### WITH Options + +WITH Options are used to specify the materialized table properties, including [connector options]({{< ref "docs/connectors/table/" >}}) and [time format option]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) for partition fields. + +```sql +-- Create a materialized table, specify the partition field as 'ds', and the corresponding time format as '-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table +PARTITIONED BY (ds) +WITH ( +'format' = 'json', +'partition.fields.ds.date-formatter' = '-MM-dd' +) +... +``` + +As shown in the above example, we specified the date-formatter option for the ds partition column. During each scheduling, the scheduling time will be converted to the ds partition value. For example, for a scheduling time of 2024-01-01 00:00:00, only the partition ds = '2024-01-01' will be refreshed. + +Note +- The `partition.fields.#.date-formatter` option only works in full mode. +- The field in the [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) must be a valid string type partition field. + +### FRESHNESS + +**FRESHNESS Definition and Refresh Mode Relationship** + +FRESHNESS defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. It does two things, firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determines the data refresh frequency to meet the actual data freshness requirements. + +**Detailed Explanation of FRESHNESS Parameter** + +The FRESHNESS parameter range is INTERVAL `''` { SECOND | MINUTE | HOUR | DAY }. `''` must be a positive integer, and in FULL mode, `''` should be a common divisor of the respective time interval. + +**Examples:** +(Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes) + +```sql +-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 1 second +FRESHNESS = INTERVAL '1' SECOND Review Comment: Since current checkpoint interval is bounded to the settings of `freshness`, I think we should warn users that the stronger freshness would introduce more impact to the checkpoint, we can tune the freshness longer and tell users to consider [changelog state-backend](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#enabling-changelog). ## docs/content/docs/dev/table/materialized-table/overview.md: ## @@ -0,0 +1,65 @@ +--- +title: Overview +weight: 1 +type: docs +aliases: +- /dev/table/materialized-table.html +--- + + +# Introduction + +Materialized Table is a new table type introduced in Flink SQL, aimed at simplifying both batch and stream data pipelines, providing a consistent development experience. By specifying data freshness and query when creating Materialized, the engine automatically derives the schema for the materialized table and creates corresponding data ref
[jira] [Commented] (FLINK-35690) Release Testing: Verify FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn
[ https://issues.apache.org/jira/browse/FLINK-35690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862653#comment-17862653 ] xuhuang commented on FLINK-35690: - Hi [~tanyuxin]. I followed your steps to implement a basic workflow, applying Celeborn as a singer layer in Hybrid Shuffle. My test results show that Flink can read, write and store Shuffle data through Celeborn, and the job is finally completed success. :P > Release Testing: Verify FLIP-459: Support Flink hybrid shuffle integration > with Apache Celeborn > --- > > Key: FLINK-35690 > URL: https://issues.apache.org/jira/browse/FLINK-35690 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yuxin Tan >Assignee: xuhuang >Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > > Follow up the test for https://issues.apache.org/jira/browse/FLINK-35533 > In Flink 1.20, we proposed integrating Flink's Hybrid Shuffle with Apache > Celeborn through a pluggable remote tier interface. To verify this feature, > you should reference these main two steps. > 1. Implement Celeborn tier. > * Implement a new tier factory and tier for Celeborn, including these APIs, > including TierFactory/TierMasterAgent/TierProducerAgent/TierConsumerAgent. > * The implementations should support granular data management at the Segment > level for both client and server sides. > 2. Use the implemented tier to shuffle data. > * Compile Flink and Celeborn. > * Deploy Celeborn service > ** Deploy a new Celeborn service with the new compiled packages. You can > reference the doc ([https://celeborn.apache.org/docs/latest/]) to deploy the > cluster. > * Add the compiled flink plugin jar (celeborn-client-flink-xxx.jar) to Flink > classpath. > * Configure the options to enable the feature. > ** Configure the option > taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class to the > new Celeborn tier classes. Except for this option, the following options > should also be added. > {code:java} > execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL > celeborn.master.endpoints: > celeborn.client.shuffle.partition.type: MAP{code} > * Run some test examples(e.g., WordCount) to verify the feature. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663450759 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * A transform projection processor contains: + * + * + * CreateTableEvent: add the user-defined computed columns into Schema. + * SchemaChangeEvent: update the columns of TransformProjection. + * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column + * and the user-defined expression computed columns. + * + */ +public class PostTransformProcessor { +private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class); +private TableInfo tableInfo; +private TableChangeInfo tableChangeInfo; +private TransformProjection transformProjection; +private @Nullable TransformFilter transformFilter; +private String timezone; +private Map projectionColumnProcessorMap; +private List cachedProjectionColumns; + +public PostTransformProcessor( +TableInfo tableInfo, +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +@Nullable TransformFilter transformFilter, +String timezone) { +this.tableInfo = tableInfo; +this.tableChangeInfo = tableChangeInfo; +this.transformProjection = transformProjection; +this.transformFilter = transformFilter; +this.timezone = timezone; +this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); +this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); +} + +public boolean hasTableChangeInfo() { +return this.tableChangeInfo != null; +} + +public boolean hasTableInfo() { +return this.tableInfo != null; +} + +public static PostTransformProcessor of( +TableInfo tableInfo, +TransformProjection transformProjection, +TransformFilter transformFilter, +String timezone) { +return new PostTransformProcessor( +tableInfo, null, transformProjection, transformFilter, timezone); +} + +public static PostTransformProcessor of( +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +TransformFilter transformFilter) { +return new PostTransformProcessor( +null, tableChangeInfo, transformProjection, transformFilter, null); +} + +public static PostTransformProcessor of( +TransformProjection transformProjection, TransformFilter transformFilter) { +return new PostTransformProcessor(null, null, transformProjection, transformFilter, null); +} + +public Schema processSchemaChangeEvent(Schema schema) { +List projectionColumns = +TransformParser.generateProjectionColumns( +transformProjection.getProjection(), schema.getColumns()); +transformProjection.setProjectionColumns(projectionColumns); +return schema.copy( +projectionColumns.stream() +.map(ProjectionColumn::getColumn) +.collect(Collectors.toList()));
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
melin commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663449011 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * A transform projection processor contains: + * + * + * CreateTableEvent: add the user-defined computed columns into Schema. + * SchemaChangeEvent: update the columns of TransformProjection. + * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column + * and the user-defined expression computed columns. + * + */ +public class PostTransformProcessor { +private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class); +private TableInfo tableInfo; +private TableChangeInfo tableChangeInfo; +private TransformProjection transformProjection; +private @Nullable TransformFilter transformFilter; +private String timezone; +private Map projectionColumnProcessorMap; +private List cachedProjectionColumns; + +public PostTransformProcessor( +TableInfo tableInfo, +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +@Nullable TransformFilter transformFilter, +String timezone) { +this.tableInfo = tableInfo; +this.tableChangeInfo = tableChangeInfo; +this.transformProjection = transformProjection; +this.transformFilter = transformFilter; +this.timezone = timezone; +this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); +this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); +} + +public boolean hasTableChangeInfo() { +return this.tableChangeInfo != null; +} + +public boolean hasTableInfo() { +return this.tableInfo != null; +} + +public static PostTransformProcessor of( +TableInfo tableInfo, +TransformProjection transformProjection, +TransformFilter transformFilter, +String timezone) { +return new PostTransformProcessor( +tableInfo, null, transformProjection, transformFilter, timezone); +} + +public static PostTransformProcessor of( +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +TransformFilter transformFilter) { +return new PostTransformProcessor( +null, tableChangeInfo, transformProjection, transformFilter, null); +} + +public static PostTransformProcessor of( +TransformProjection transformProjection, TransformFilter transformFilter) { +return new PostTransformProcessor(null, null, transformProjection, transformFilter, null); +} + +public Schema processSchemaChangeEvent(Schema schema) { +List projectionColumns = +TransformParser.generateProjectionColumns( +transformProjection.getProjection(), schema.getColumns()); +transformProjection.setProjectionColumns(projectionColumns); +return schema.copy( +projectionColumns.stream() +.map(ProjectionColumn::getColumn) +.collect(Collectors.toList())); +
[jira] [Resolved] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
[ https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-35733. - Fix Version/s: 2.0.0 Assignee: RocMarshal Resolution: Fixed > Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull > > > Key: FLINK-35733 > URL: https://issues.apache.org/jira/browse/FLINK-35733 > Project: Flink > Issue Type: Technical Debt >Reporter: RocMarshal >Assignee: RocMarshal >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
[ https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862642#comment-17862642 ] Rui Fan commented on FLINK-35733: - Merged to 2.0.0 via: * 1db333158b8d0d338e4d116d39c65466066ef0c7 * b782acfb9c04a18d8adb8e6596bd22e0e5ebe5c3 > Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull > > > Key: FLINK-35733 > URL: https://issues.apache.org/jira/browse/FLINK-35733 > Project: Flink > Issue Type: Technical Debt >Reporter: RocMarshal >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
[ https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35733: --- Labels: pull-request-available (was: ) > Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull > > > Key: FLINK-35733 > URL: https://issues.apache.org/jira/browse/FLINK-35733 > Project: Flink > Issue Type: Technical Debt >Reporter: RocMarshal >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35733] Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull [flink]
1996fanrui merged PR #25001: URL: https://github.com/apache/flink/pull/25001 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35745) Add namespace convention doc for Flink lineage
Zhenqiu Huang created FLINK-35745: - Summary: Add namespace convention doc for Flink lineage Key: FLINK-35745 URL: https://issues.apache.org/jira/browse/FLINK-35745 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Zhenqiu Huang We will recommend to follow the convention from openlineage. https://openlineage.io/docs/spec/naming/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33211) Implement table lineage graph
[ https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenqiu Huang closed FLINK-33211. - Resolution: Done The PR is merged to upstream > Implement table lineage graph > - > > Key: FLINK-33211 > URL: https://issues.apache.org/jira/browse/FLINK-33211 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Fang Yong >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Implement table lineage graph -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33212) Introduce job status changed listener for lineage
[ https://issues.apache.org/jira/browse/FLINK-33212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenqiu Huang closed FLINK-33212. - Resolution: Done The PR is merged to upstream. > Introduce job status changed listener for lineage > - > > Key: FLINK-33212 > URL: https://issues.apache.org/jira/browse/FLINK-33212 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Affects Versions: 1.20.0 >Reporter: Fang Yong >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Introduce job status changed listener relevant interfaces and its > implementation. The job listeners will be registered in runtime and also > client side pipeline executors, including localExecutor, embeddedExecutor for > application mode, and abstract session cluster executor. When job > submission is successfully, the job created event will be created with > lineage graph info. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33211) Implement table lineage graph
[ https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862638#comment-17862638 ] Zhenqiu Huang commented on FLINK-33211: --- PR is merged to upstream > Implement table lineage graph > - > > Key: FLINK-33211 > URL: https://issues.apache.org/jira/browse/FLINK-33211 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Fang Yong >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Implement table lineage graph -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35735) Don't list catalogs when closing session
[ https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang closed FLINK-35735. - Fix Version/s: 2.0.0 Resolution: Fixed > Don't list catalogs when closing session > > > Key: FLINK-35735 > URL: https://issues.apache.org/jira/browse/FLINK-35735 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.20.0 >Reporter: Shengkai Fang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35735) Don't list catalogs when closing session
[ https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862632#comment-17862632 ] Shengkai Fang commented on FLINK-35735: --- Merged into master: af3e39fa4ce70187166e2f1dfef1ebfeee6d40cb > Don't list catalogs when closing session > > > Key: FLINK-35735 > URL: https://issues.apache.org/jira/browse/FLINK-35735 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.20.0 >Reporter: Shengkai Fang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35735) Don't list catalogs when closing session
[ https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35735: --- Labels: pull-request-available (was: ) > Don't list catalogs when closing session > > > Key: FLINK-35735 > URL: https://issues.apache.org/jira/browse/FLINK-35735 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.20.0 >Reporter: Shengkai Fang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35735][sql-gateway] Don't list all catalogs when closing session [flink]
fsk119 merged PR #25010: URL: https://github.com/apache/flink/pull/25010 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]
lshangq commented on PR #24787: URL: https://github.com/apache/flink/pull/24787#issuecomment-2204906269 @leonardBang Could you help to take a look at it. Or how do you think it should be changed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs merged PR #24618: URL: https://github.com/apache/flink/pull/24618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33211][table] support flink table lineage [flink]
FangYongs commented on PR #24618: URL: https://github.com/apache/flink/pull/24618#issuecomment-2204890224 Thanks @HuangZhenQiu , +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663376970 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * A transform projection processor contains: + * + * + * CreateTableEvent: add the user-defined computed columns into Schema. + * SchemaChangeEvent: update the columns of TransformProjection. + * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column + * and the user-defined expression computed columns. + * + */ +public class PostTransformProcessor { +private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class); +private TableInfo tableInfo; +private TableChangeInfo tableChangeInfo; +private TransformProjection transformProjection; +private @Nullable TransformFilter transformFilter; +private String timezone; +private Map projectionColumnProcessorMap; +private List cachedProjectionColumns; + +public PostTransformProcessor( +TableInfo tableInfo, +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +@Nullable TransformFilter transformFilter, +String timezone) { +this.tableInfo = tableInfo; +this.tableChangeInfo = tableChangeInfo; +this.transformProjection = transformProjection; +this.transformFilter = transformFilter; +this.timezone = timezone; +this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); +this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); +} + +public boolean hasTableChangeInfo() { +return this.tableChangeInfo != null; +} + +public boolean hasTableInfo() { +return this.tableInfo != null; +} + +public static PostTransformProcessor of( +TableInfo tableInfo, +TransformProjection transformProjection, +TransformFilter transformFilter, +String timezone) { +return new PostTransformProcessor( +tableInfo, null, transformProjection, transformFilter, timezone); +} + +public static PostTransformProcessor of( +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +TransformFilter transformFilter) { +return new PostTransformProcessor( +null, tableChangeInfo, transformProjection, transformFilter, null); +} + +public static PostTransformProcessor of( +TransformProjection transformProjection, TransformFilter transformFilter) { +return new PostTransformProcessor(null, null, transformProjection, transformFilter, null); +} + +public Schema processSchemaChangeEvent(Schema schema) { +List projectionColumns = +TransformParser.generateProjectionColumns( +transformProjection.getProjection(), schema.getColumns()); +transformProjection.setProjectionColumns(projectionColumns); +return schema.copy( +projectionColumns.stream() +.map(ProjectionColumn::getColumn) +.collect(Collectors.toList()));
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
lvyanquan commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663360564 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * A transform projection processor contains: + * + * + * CreateTableEvent: add the user-defined computed columns into Schema. + * SchemaChangeEvent: update the columns of TransformProjection. + * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column + * and the user-defined expression computed columns. + * + */ +public class PostTransformProcessor { +private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class); +private TableInfo tableInfo; +private TableChangeInfo tableChangeInfo; +private TransformProjection transformProjection; +private @Nullable TransformFilter transformFilter; +private String timezone; +private Map projectionColumnProcessorMap; +private List cachedProjectionColumns; + +public PostTransformProcessor( +TableInfo tableInfo, +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +@Nullable TransformFilter transformFilter, +String timezone) { +this.tableInfo = tableInfo; +this.tableChangeInfo = tableChangeInfo; +this.transformProjection = transformProjection; +this.transformFilter = transformFilter; +this.timezone = timezone; +this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); +this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); +} + +public boolean hasTableChangeInfo() { +return this.tableChangeInfo != null; +} + +public boolean hasTableInfo() { +return this.tableInfo != null; +} + +public static PostTransformProcessor of( +TableInfo tableInfo, +TransformProjection transformProjection, +TransformFilter transformFilter, +String timezone) { +return new PostTransformProcessor( +tableInfo, null, transformProjection, transformFilter, timezone); +} + +public static PostTransformProcessor of( +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +TransformFilter transformFilter) { +return new PostTransformProcessor( +null, tableChangeInfo, transformProjection, transformFilter, null); +} + +public static PostTransformProcessor of( +TransformProjection transformProjection, TransformFilter transformFilter) { +return new PostTransformProcessor(null, null, transformProjection, transformFilter, null); +} + +public Schema processSchemaChangeEvent(Schema schema) { +List projectionColumns = +TransformParser.generateProjectionColumns( +transformProjection.getProjection(), schema.getColumns()); +transformProjection.setProjectionColumns(projectionColumns); +return schema.copy( +projectionColumns.stream() +.map(ProjectionColumn::getColumn) +.collect(Collectors.toList()));
[jira] [Updated] (FLINK-35736) Add E2e migration scripts for Flink CDC
[ https://issues.apache.org/jira/browse/FLINK-35736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35736: --- Labels: pull-request-available (was: ) > Add E2e migration scripts for Flink CDC > --- > > Key: FLINK-35736 > URL: https://issues.apache.org/jira/browse/FLINK-35736 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, there's no E2e migration tests in Flink CDC CI, and it's not very > convenient for testers to run migration tests to verify RC release before > each release. > Adding a automation migration testing script could help verifying CDC > backwards compatibility better. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35736][tests] Add migration test scripts & CI workflows [flink-cdc]
lvyanquan commented on code in PR #3447: URL: https://github.com/apache/flink-cdc/pull/3447#discussion_r1663361163 ## tools/mig-test/README.md: ## @@ -0,0 +1,36 @@ +# Flink CDC Migration Test Utilities + +## Pipeline Jobs +### Preparation + +1. Install Ruby (macOS has embedded it by default) +2. (Optional) Run `gem install terminal-table` for better display + +### Compile snapshot CDC versions +3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository +4. Run `ruby prepare_libs.rb` to download released / compile snapshot CDC versions + +### Run migration tests +5. Enter `conf/` and run `docker compose up -d` to start up test containers +6. Set `FLINK_HOME` to the home directory of Flink +7. Run `ruby run_migration_test.rb` to start testing Review Comment: Considering that this is mainly provided for developers and there are not many steps involved, I can also accept the current mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]
lvyanquan commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663360564 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.parser.TransformParser; +import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * The processor of transform projection applies to process a row of filtering tables. + * + * A transform projection processor contains: + * + * + * CreateTableEvent: add the user-defined computed columns into Schema. + * SchemaChangeEvent: update the columns of TransformProjection. + * DataChangeEvent: Fill data field to row in PreTransformOperator. Process the data column + * and the user-defined expression computed columns. + * + */ +public class PostTransformProcessor { +private static final Logger LOG = LoggerFactory.getLogger(PostTransformProcessor.class); +private TableInfo tableInfo; +private TableChangeInfo tableChangeInfo; +private TransformProjection transformProjection; +private @Nullable TransformFilter transformFilter; +private String timezone; +private Map projectionColumnProcessorMap; +private List cachedProjectionColumns; + +public PostTransformProcessor( +TableInfo tableInfo, +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +@Nullable TransformFilter transformFilter, +String timezone) { +this.tableInfo = tableInfo; +this.tableChangeInfo = tableChangeInfo; +this.transformProjection = transformProjection; +this.transformFilter = transformFilter; +this.timezone = timezone; +this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); +this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, transformProjection); +} + +public boolean hasTableChangeInfo() { +return this.tableChangeInfo != null; +} + +public boolean hasTableInfo() { +return this.tableInfo != null; +} + +public static PostTransformProcessor of( +TableInfo tableInfo, +TransformProjection transformProjection, +TransformFilter transformFilter, +String timezone) { +return new PostTransformProcessor( +tableInfo, null, transformProjection, transformFilter, timezone); +} + +public static PostTransformProcessor of( +TableChangeInfo tableChangeInfo, +TransformProjection transformProjection, +TransformFilter transformFilter) { +return new PostTransformProcessor( +null, tableChangeInfo, transformProjection, transformFilter, null); +} + +public static PostTransformProcessor of( +TransformProjection transformProjection, TransformFilter transformFilter) { +return new PostTransformProcessor(null, null, transformProjection, transformFilter, null); +} + +public Schema processSchemaChangeEvent(Schema schema) { +List projectionColumns = +TransformParser.generateProjectionColumns( +transformProjection.getProjection(), schema.getColumns()); +transformProjection.setProjectionColumns(projectionColumns); +return schema.copy( +projectionColumns.stream() +.map(ProjectionColumn::getColumn) +.collect(Collectors.toList()));
Re: [PR] [FLINK-35743][cdc-runtime] Fix the time zone configuration for temporal functions is not effective [flink-cdc]
yuxiqian commented on code in PR #3449: URL: https://github.com/apache/flink-cdc/pull/3449#discussion_r1663358098 ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java: ## Review Comment: I could still reproduce `testTimestampDiffTransform` failure when testing time was set to June 1st 04:00. Seems month and year diff will not be 0 when 8 hours gap crossed month / year boundaries, where the output would be `-28800, -480, -8, 0, [0 or -1], [0 or -1]`. ## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java: ## @@ -553,6 +562,26 @@ void testTimestampDiffTransform() throws Exception { Assertions.assertThat( transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) .isEqualTo(new StreamRecord<>(insertEventExpect)); + +DataChangeEvent insertEvent2 = +DataChangeEvent.insertEvent( +TIMESTAMPDIFF_TABLEID, +recordDataGenerator.generate( +new Object[] { +new BinaryStringData("2"), null, null, null, null, null, null +})); +DataChangeEvent insertEventExpect2 = +DataChangeEvent.insertEvent( +TIMESTAMPDIFF_TABLEID, +recordDataGenerator.generate( +new Object[] { +new BinaryStringData("2"), -28800, -480, -8, 0, 0, 0 Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35638) Modify OceanBase Docker container to make the test cases runnable on non-Linux systems
[ https://issues.apache.org/jira/browse/FLINK-35638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35638: --- Labels: pull-request-available (was: ) > Modify OceanBase Docker container to make the test cases runnable on > non-Linux systems > -- > > Key: FLINK-35638 > URL: https://issues.apache.org/jira/browse/FLINK-35638 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: He Wang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35638] Refactor OceanBase test cases and remove dependency on host network [flink-cdc]
whhe commented on PR #3439: URL: https://github.com/apache/flink-cdc/pull/3439#issuecomment-2204779815 @GOODBOY008 @ruanhang1993 PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35742] Don't create RocksDB Column Family if task cancellation is in progress [flink]
rkhachatryan commented on PR #25011: URL: https://github.com/apache/flink/pull/25011#issuecomment-2204412181 @flinkbot run azure please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35742] Don't create RocksDB Column Family if task cancellation is in progress [flink]
rkhachatryan commented on PR #25011: URL: https://github.com/apache/flink/pull/25011#issuecomment-2204034788 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662905134 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Implementation of the {@link StreamProxy} for DynamoDB streams. */ +public class DynamoDbStreamsProxy implements StreamProxy { + +private final DynamoDbStreamsClient dynamoDbStreamsClient; +private final SdkHttpClient httpClient; +private final Map shardIdToIteratorStore; + +private static final FullJitterBackoff BACKOFF = new FullJitterBackoff(); + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsProxy.class); + +public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, SdkHttpClient httpClient) { +this.dynamoDbStreamsClient = dynamoDbStreamsClient; +this.httpClient = httpClient; +this.shardIdToIteratorStore = new ConcurrentHashMap<>(); +} + +@Override +public List listShards(String streamArn, @Nullable String lastSeenShardId) { +return this.getShardsOfStream(streamArn, lastSeenShardId); +} + +@Override +public GetRecordsResponse getRecords(String streamArn, String shardId, StartingPosition startingPosition) { +String shardIterator = +shardIdToIteratorStore.computeIfAbsent( +shardId, (s) -> getShardIterator(streamArn, s, startingPosition)); + +if (shardIterator == null) { +return null; +} Review Comment: today, in DDB streams API, the only indication of shard being ended is the next shard iterator being null. I think the return null will only ever be used to tell that shard end has reached. I don't have any preference over returning a repsonse or returning null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662908458 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/FullJitterBackoff.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.annotation.Internal; + +import java.util.Random; + +/** + * Used to calculate full jitter backoff sleep durations. + * + * @see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/";> + * Exponential Backoff and Jitter + */ +@Internal +public class FullJitterBackoff { + +/** Random seed used to calculate backoff jitter for DynamoDb Streams operations. */ +private final Random seed = new Random(); + +/** + * Calculates the sleep time for full jitter based on the given parameters. + * + * @param baseMillis the base backoff time in milliseconds + * @param maxMillis the maximum backoff time in milliseconds Review Comment: Yes, we'll be using AWS SDK retry policy. This missed cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662908016 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Implementation of the {@link StreamProxy} for DynamoDB streams. */ +public class DynamoDbStreamsProxy implements StreamProxy { + +private final DynamoDbStreamsClient dynamoDbStreamsClient; +private final SdkHttpClient httpClient; +private final Map shardIdToIteratorStore; + +private static final FullJitterBackoff BACKOFF = new FullJitterBackoff(); + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsProxy.class); + +public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, SdkHttpClient httpClient) { +this.dynamoDbStreamsClient = dynamoDbStreamsClient; +this.httpClient = httpClient; +this.shardIdToIteratorStore = new ConcurrentHashMap<>(); +} + +@Override +public List listShards(String streamArn, @Nullable String lastSeenShardId) { +return this.getShardsOfStream(streamArn, lastSeenShardId); +} + +@Override +public GetRecordsResponse getRecords(String streamArn, String shardId, StartingPosition startingPosition) { +String shardIterator = +shardIdToIteratorStore.computeIfAbsent( +shardId, (s) -> getShardIterator(streamArn, s, startingPosition)); + +if (shardIterator == null) { +return null; +} +try { +GetRecordsResponse getRecordsResponse = getRecords(shardIterator); +if (getRecordsResponse.nextShardIterator() != null) { +shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); +} +return getRecordsResponse; +} catch (ExpiredIteratorException e) { +// Eagerly retry getRecords() if the iterator is expired +shardIterator = getShardIterator(streamArn, shardId, startingPosition); +GetRecordsResponse getRecordsResponse = getRecords(shardIterator); +if (getRecordsResponse.nextShardIterator() != null) { +shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); +} +return getRecordsResponse; +} +} + +@Override +public void close() throws IOException { +dynamoDbStreamsClient.close(); +httpClient.close(); +} + +private List getShardsOfStream( +String streamName, @Nullable String lastSeenShardId) { +List shardsOfStream = new ArrayList<>(); + +DescribeStreamResponse describeStreamResponse; +do { +describeStreamResponse = this.describeStream(streamName, lastSeenShardId); +L
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662907223 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Implementation of the {@link StreamProxy} for DynamoDB streams. */ +public class DynamoDbStreamsProxy implements StreamProxy { + +private final DynamoDbStreamsClient dynamoDbStreamsClient; +private final SdkHttpClient httpClient; +private final Map shardIdToIteratorStore; + +private static final FullJitterBackoff BACKOFF = new FullJitterBackoff(); + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsProxy.class); + +public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, SdkHttpClient httpClient) { +this.dynamoDbStreamsClient = dynamoDbStreamsClient; +this.httpClient = httpClient; +this.shardIdToIteratorStore = new ConcurrentHashMap<>(); +} + +@Override +public List listShards(String streamArn, @Nullable String lastSeenShardId) { +return this.getShardsOfStream(streamArn, lastSeenShardId); +} + +@Override +public GetRecordsResponse getRecords(String streamArn, String shardId, StartingPosition startingPosition) { +String shardIterator = +shardIdToIteratorStore.computeIfAbsent( +shardId, (s) -> getShardIterator(streamArn, s, startingPosition)); + +if (shardIterator == null) { +return null; +} +try { +GetRecordsResponse getRecordsResponse = getRecords(shardIterator); +if (getRecordsResponse.nextShardIterator() != null) { +shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); +} +return getRecordsResponse; +} catch (ExpiredIteratorException e) { +// Eagerly retry getRecords() if the iterator is expired +shardIterator = getShardIterator(streamArn, shardId, startingPosition); +GetRecordsResponse getRecordsResponse = getRecords(shardIterator); +if (getRecordsResponse.nextShardIterator() != null) { +shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); +} +return getRecordsResponse; +} +} + +@Override +public void close() throws IOException { +dynamoDbStreamsClient.close(); +httpClient.close(); +} + +private List getShardsOfStream( +String streamName, @Nullable String lastSeenShardId) { +List shardsOfStream = new ArrayList<>(); + +DescribeStreamResponse describeStreamResponse; +do { +describeStreamResponse = this.describeStream(streamName, lastSeenShardId); +L
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662905134 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Implementation of the {@link StreamProxy} for DynamoDB streams. */ +public class DynamoDbStreamsProxy implements StreamProxy { + +private final DynamoDbStreamsClient dynamoDbStreamsClient; +private final SdkHttpClient httpClient; +private final Map shardIdToIteratorStore; + +private static final FullJitterBackoff BACKOFF = new FullJitterBackoff(); + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsProxy.class); + +public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, SdkHttpClient httpClient) { +this.dynamoDbStreamsClient = dynamoDbStreamsClient; +this.httpClient = httpClient; +this.shardIdToIteratorStore = new ConcurrentHashMap<>(); +} + +@Override +public List listShards(String streamArn, @Nullable String lastSeenShardId) { +return this.getShardsOfStream(streamArn, lastSeenShardId); +} + +@Override +public GetRecordsResponse getRecords(String streamArn, String shardId, StartingPosition startingPosition) { +String shardIterator = +shardIdToIteratorStore.computeIfAbsent( +shardId, (s) -> getShardIterator(streamArn, s, startingPosition)); + +if (shardIterator == null) { +return null; +} Review Comment: today, in DDB streams API, the only indication of shard being ended is the next shard iterator being null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903372 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException; +import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; + +/** + * This class is used to discover and assign DynamoDb Streams splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class DynamoDbStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Configuration sourceConfig; +private final StreamProxy streamProxy; +private final DynamoDbStreamsShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public DynamoDbStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Configuration sourceConfig, +StreamProxy streamProxy, +DynamoDbStreamsShardAssigner shardAssigner, +DynamoDbStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.sourceConfig = sourceConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits = state.getUnassignedSplits(); +} +} + +@Override +public void start() { +if (lastSeenShardId == null) { +context.callAsync(this::initialDiscoverSplits, this::assignSplits); +} + +final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS); +context.callAsync( +this::periodicallyDiscoverSplits, +this::assignSplits, +shardDiscoveryInterval, +shardDiscoveryInterval); +} + +@Override +public void handleSplitRequest(int subtaskId, @Nullable String reques
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903043 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException; +import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; + +/** + * This class is used to discover and assign DynamoDb Streams splits to subtasks on the Flink cluster. This + * runs on the JobManager. + */ +@Internal +public class DynamoDbStreamsSourceEnumerator +implements SplitEnumerator { + +private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class); + +private final SplitEnumeratorContext context; +private final String streamArn; +private final Configuration sourceConfig; +private final StreamProxy streamProxy; +private final DynamoDbStreamsShardAssigner shardAssigner; +private final ShardAssignerContext shardAssignerContext; + +private final Map> splitAssignment = new HashMap<>(); +private final Set assignedSplitIds = new HashSet<>(); +private final Set unassignedSplits; + +private String lastSeenShardId; + +public DynamoDbStreamsSourceEnumerator( +SplitEnumeratorContext context, +String streamArn, +Configuration sourceConfig, +StreamProxy streamProxy, +DynamoDbStreamsShardAssigner shardAssigner, +DynamoDbStreamsSourceEnumeratorState state) { +this.context = context; +this.streamArn = streamArn; +this.sourceConfig = sourceConfig; +this.streamProxy = streamProxy; +this.shardAssigner = shardAssigner; +this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); +if (state == null) { +this.lastSeenShardId = null; +this.unassignedSplits = new HashSet<>(); +} else { +this.lastSeenShardId = state.getLastSeenShardId(); +this.unassignedSplits = state.getUnassignedSplits(); +} +} + +@Override +public void start() { +if (lastSeenShardId == null) { +context.callAsync(this::initialDiscoverSplits, this::assignSplits); +} + +final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS); +context.callAsync( +this::periodicallyDiscoverSplits, +this::assignSplits, +shardDiscoveryInterval, +shardDiscoveryInterval); +} + +@Override +public void handleSplitRequest(int subtaskId, @Nullable String reques
[jira] [Commented] (FLINK-35695) Release Testing: Verify FLINK-32315: Support local file upload in K8s mode
[ https://issues.apache.org/jira/browse/FLINK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862530#comment-17862530 ] Mate Czagany commented on FLINK-35695: -- h2. Verification Conclusion Using the steps described below, I could confirm that the new feature works as expected and documented. > Release Testing: Verify FLINK-32315: Support local file upload in K8s mode > -- > > Key: FLINK-35695 > URL: https://issues.apache.org/jira/browse/FLINK-35695 > Project: Flink > Issue Type: Sub-task > Components: Client / Job Submission >Reporter: Ferenc Csaky >Assignee: Mate Czagany >Priority: Blocker > Labels: release-testing > Fix For: 1.20.0 > > Attachments: image-2024-07-01-14-54-17-770.png, > image-2024-07-01-15-04-53-764.png > > > Follow up the test for FLINK-32315. > In Flink 1.20, we introduced a local file upload possibility for Kubernetes > deployments. To verify this feature, you can check the relevant > [PR|https://github.com/apache/flink/pull/24303], which includes the docs, and > examples for more information. > To test this feature, it is required to have an available Kubernetes cluster > to deploy to, and some DFS where Flink can deploy the local JAR. For a > sandbox setup, I recommend to install {{minikube}}. The flink-k8s-operator > [quickstart > guide|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#prerequisites] > explains that pretty well ({{helm}} is not needed here). For the DFS, I have > a gist to setup Minio on a K8s pod > [here|https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65]. > The two following main use-case should be handled correctly: > # Deploy job with a local job JAR, but without further dependencies > {code:bash} > $ ./bin/flink run-application \ > --target kubernetes-application \ > -Dkubernetes.cluster-id=my-first-application-cluster \ > -Dkubernetes.container.image=flink:1.20 \ > -Dkubernetes.artifacts.local-upload-enabled=true \ > -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \ > local:///path/to/TopSpeedWindowing.jar > {code} > # Deploy job with a local job JAR, and further dependencies (e.g. a UDF > included in a separate JAR). > {code:bash} > $ ./bin/flink run-application \ > --target kubernetes-application \ > -Dkubernetes.cluster-id=my-first-application-cluster \ > -Dkubernetes.container.image=flink:1.20 \ > -Dkubernetes.artifacts.local-upload-enabled=true \ > -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \ > > -Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar > \ > local:///tmp/my-flink-job.jar > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]
gguptp commented on code in PR #146: URL: https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903713 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.proxy; + +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; +import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; +import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; +import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException; +import software.amazon.awssdk.services.dynamodb.model.Shard; +import software.amazon.awssdk.services.dynamodb.model.StreamStatus; +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Implementation of the {@link StreamProxy} for DynamoDB streams. */ +public class DynamoDbStreamsProxy implements StreamProxy { Review Comment: Good point. WIll do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-31215) Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators
[ https://issues.apache.org/jira/browse/FLINK-31215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-31215: --- Labels: pull-request-available (was: ) > Backpropagate processing rate limits from non-scalable bottlenecks to > upstream operators > > > Key: FLINK-31215 > URL: https://issues.apache.org/jira/browse/FLINK-31215 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Gyula Fora >Assignee: Artem Plyusnin >Priority: Major > Labels: pull-request-available > > The current algorithm scales operators based on input data rates by > propagating it forward through the graph. > However there are cases where a certain operators processing capacity is > limited either because it has a set maxParallelism or the users excludes it > from scaling (or otherwise the capacity doesnt increase with scaling). > In these cases it doesn't make sense to scale upstream operators to the > target data rate if the job is going to be bottlenecked by a downstream > operator. But instead we should backpropagate the limit based on the > non-scalable bottleneck. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators [flink-kubernetes-operator]
aplyusnin commented on PR #847: URL: https://github.com/apache/flink-kubernetes-operator/pull/847#issuecomment-2203883673 Hi, @gyfora, could you review the code and run the workflows, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35730) PipelineDefinitionParser add parse string method
[ https://issues.apache.org/jira/browse/FLINK-35730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35730: --- Labels: pull-request-available (was: ) > PipelineDefinitionParser add parse string method > > > Key: FLINK-35730 > URL: https://issues.apache.org/jira/browse/FLINK-35730 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: Wenkai Qi >Priority: Not a Priority > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > /** > * Parse the specified pipeline definition string, merge global > configurations, then generate > * the \{@link PipelineDef}. > */ > PipelineDef parse(String text, Configuration globalPipelineConfig) throws > Exception; > > Adding this method is beneficial for third-party platforms to use this > interface to parse strings as PipelineDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Flink Kubernetes Operator 1.9.0 [flink-web]
gyfora merged PR #747: URL: https://github.com/apache/flink-web/pull/747 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Support config yaml [flink-kubernetes-operator]
ctrlaltdilj commented on PR #848: URL: https://github.com/apache/flink-kubernetes-operator/pull/848#issuecomment-2203244208 Thanks @mateczagany for all the info! I have requested a JIRA account, and will file a ticket -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35730][cdc-cli] PipelineDefinitionParser add parse string method [flink-cdc]
aiwenmo commented on PR #3444: URL: https://github.com/apache/flink-cdc/pull/3444#issuecomment-2203208303 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35742] Don't create RocksDB CF if task cancellation is in progress [flink]
rkhachatryan opened a new pull request, #25011: URL: https://github.com/apache/flink/pull/25011 We observe a lot of TMs stuck for > 30s in RocksDBHandle.registerStateColumnFamilyHandleWithImport which boil down to native calls to create Column Family. This change registers prevents CF creation if task cancellation is in progress. ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]
hackergin commented on code in PR #24975: URL: https://github.com/apache/flink/pull/24975#discussion_r1662619373 ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html +--- + + +# Quick Start Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment, creating materialized tables in CONTINUOUS mode, and creating materialized tables in FULL mode. + +## Environment Setup + +### Directory Preparation + +**Replace the example paths below with real paths on your machine.** + +- Create directories for Catalog Store and Catalog dependencies: + +``` +# Directory for File Catalog Store to save catalog information +mkdir -p /path/to/catalog/store + +# Directory for test-filesystem Catalog to save table metadata and table data +mkdir -p /path/to/catalog/test-filesystem + +# Default database for test-filesystem Catalog +mkdir -p /path/to/catalog/test-filesystem/mydb +``` + +- Create directories for Checkpoints and Savepoints to save Checkpoints and Savepoints respectively: + +``` +mkdir -p /path/to/checkpoint + +mkdir -p /path/to/savepoint +``` + +### Dependency Preparation + +The method here is similar to the steps recorded in [local installation]({{< ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need to have __Java 11__ installed locally. You can check the installed Java version with the following command: Review Comment: Here I referenced the descriptions from other chapters. From my understanding, it is to inform users that Java 8 is no longer supported, and to discourage its use even though it is currently supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35743) The time zone configuration for temporal functions is not effective
[ https://issues.apache.org/jira/browse/FLINK-35743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35743: --- Labels: pull-request-available (was: ) > The time zone configuration for temporal functions is not effective > --- > > Key: FLINK-35743 > URL: https://issues.apache.org/jira/browse/FLINK-35743 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Wenkai Qi >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.1.1 > > Original Estimate: 24h > Remaining Estimate: 24h > > The time zone configuration for temporal functions is not effective, > including now() and date_format(). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35742) Don't create RocksDB CF if task cancellation is in progress
[ https://issues.apache.org/jira/browse/FLINK-35742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35742: --- Labels: pull-request-available (was: ) > Don't create RocksDB CF if task cancellation is in progress > --- > > Key: FLINK-35742 > URL: https://issues.apache.org/jira/browse/FLINK-35742 > Project: Flink > Issue Type: Improvement >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]
hackergin commented on code in PR #24975: URL: https://github.com/apache/flink/pull/24975#discussion_r166970 ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html +--- + + +# Quick Start Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment, creating materialized tables in CONTINUOUS mode, and creating materialized tables in FULL mode. + +## Environment Setup + +### Directory Preparation + +**Replace the example paths below with real paths on your machine.** + +- Create directories for Catalog Store and Catalog dependencies: + +``` +# Directory for File Catalog Store to save catalog information +mkdir -p /path/to/catalog/store + +# Directory for test-filesystem Catalog to save table metadata and table data +mkdir -p /path/to/catalog/test-filesystem + +# Default database for test-filesystem Catalog +mkdir -p /path/to/catalog/test-filesystem/mydb +``` + +- Create directories for Checkpoints and Savepoints to save Checkpoints and Savepoints respectively: + +``` +mkdir -p /path/to/checkpoint + +mkdir -p /path/to/savepoint +``` + +### Dependency Preparation + +The method here is similar to the steps recorded in [local installation]({{< ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need to have __Java 11__ installed locally. You can check the installed Java version with the following command: Review Comment: Yes, but we need to avoid mentions of Java8 Because of https://issues.apache.org/jira/browse/FLINK-25247 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. [flink]
fapaul commented on code in PR #24998: URL: https://github.com/apache/flink/pull/24998#discussion_r1661069156 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java: ## @@ -337,6 +337,20 @@ void testChainNodeSetParallelism() { assertThat(vertices.get(0).isParallelismConfigured()).isTrue(); } +@Test +void testParallelismConfiguredForSinkV2() { Review Comment: Isn't this test better placed in `SinkTransformationTranslatorITCaseBase`? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java: ## @@ -357,7 +357,9 @@ private R adjustTransformations( // In this case, the subTransformation does not contain any customized // parallelism value and will therefore inherit the parallelism value // from the sinkTransformation. - subTransformation.setParallelism(transformation.getParallelism()); +subTransformation.setParallelism( +transformation.getParallelism(), +transformation.isParallelismConfigured()); Review Comment: I haven't followed the runtime development for a while but is `isParallelismConfigured` also applicable for streaming jobs or only batch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35743][cdc-runtime] Fix the time zone configuration for temporal functions is not effective [flink-cdc]
aiwenmo commented on PR #3449: URL: https://github.com/apache/flink-cdc/pull/3449#issuecomment-2203201015 @yuxiqian PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35742] Don't create RocksDB CF if task cancellation is in progress [flink]
flinkbot commented on PR #25011: URL: https://github.com/apache/flink/pull/25011#issuecomment-2203191141 ## CI report: * 30dc775c032759e64215775dba3f7528aa3b5301 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. [flink]
JunRuiLee commented on PR #24998: URL: https://github.com/apache/flink/pull/24998#issuecomment-2203161525 Squashed the commits and rebased onto master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode [flink]
flinkbot commented on PR #25005: URL: https://github.com/apache/flink/pull/25005#issuecomment-2200185822 ## CI report: * 765d5ce23ee57c25cde2ac93c22596eff314e6e7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [BP-1.20][FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode [flink]
hackergin opened a new pull request, #25005: URL: https://github.com/apache/flink/pull/25005 ## What is the purpose of the change Currently, in continuous mode, the checkpoint interval is set based on freshness by default. However, if the user explicitly sets a checkpoint interval, we should follow the user's setting. ## Brief change log * Fix periodic refresh job naming. * Do not override the user-defined checkpoint interval in continuous mode ## Verifying this change * Add test `testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval` in MaterializedTableStatementITCase to verify that we don't over write the user custom checkpoint interval. BP 1.20 for https://github.com/apache/flink/pull/25002 ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Release flink-shaded 19.0 [flink-web]
MartijnVisser commented on code in PR #749: URL: https://github.com/apache/flink-web/pull/749#discussion_r1661043966 ## docs/data/additional_components.yml: ## @@ -33,12 +39,6 @@ flink-shaded-17.0: source_release_asc_url: "https://downloads.apache.org/flink/flink-shaded-17.0/flink-shaded-17.0-src.tgz.asc"; source_release_sha512_url: "https://downloads.apache.org/flink/flink-shaded-17.0/flink-shaded-17.0-src.tgz.sha512"; -flink-shaded-16.2: - name: "Apache Flink-shaded 16.2 Source Release" - source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz"; - source_release_asc_url: "https://downloads.apache.org/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz.asc"; - source_release_sha512_url: "https://downloads.apache.org/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz.sha512"; - Review Comment: Do we only keep the latest version of shaded, or do we keep all of them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35731) Sink V2 operator is mistakenly assumed always to be parallelism configured
[ https://issues.apache.org/jira/browse/FLINK-35731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35731: --- Labels: pull-request-available (was: ) > Sink V2 operator is mistakenly assumed always to be parallelism configured > -- > > Key: FLINK-35731 > URL: https://issues.apache.org/jira/browse/FLINK-35731 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Junrui Li >Priority: Major > Labels: pull-request-available > > Currently, the Sink V2 operator is always marked as parallelism configured, > which prevents parallelism from being inferred. This can cause confusion for > users utilizing the Adaptive Batch scheduler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]
XComp commented on code in PR #24911: URL: https://github.com/apache/flink/pull/24911#discussion_r1647485917 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java: ## @@ -81,23 +82,20 @@ public void reportCheckpointMetrics( public void reportInitializationMetrics( ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics) { -if (executionGraph.getCheckpointStatsTracker() == null) { +final CheckpointCoordinatorConfiguration checkpointConfig = +executionGraph.getCheckpointCoordinatorConfiguration(); +if (checkpointConfig == null || !checkpointConfig.isCheckpointingEnabled()) { // TODO: Consider to support reporting initialization stats without checkpointing log.debug( "Ignoring reportInitializationMetrics if checkpoint coordinator is not present"); Review Comment: Ok, the work I described in [my comment above](https://github.com/apache/flink/pull/24911#discussion_r1647484212) makes it necessary to verify the code. So, I might fix that along the way as well. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1660998380 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,111 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT Review Comment: This should be `4.4` ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,111 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT Review Comment: This should be `4.4-SNAPSHOT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35734) Do not override the user-defined checkpoint interval in continuous mode.
[ https://issues.apache.org/jira/browse/FLINK-35734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35734: --- Labels: pull-request-available (was: ) > Do not override the user-defined checkpoint interval in continuous mode. > > > Key: FLINK-35734 > URL: https://issues.apache.org/jira/browse/FLINK-35734 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API, Table SQL / Gateway >Reporter: Feng Jin >Assignee: Feng Jin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > {color:#00}Currently, in continuous mode, the checkpoint interval is set > based on freshness by default. However, if the user explicitly sets a > checkpoint interval, we should follow the user's setting.{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]
lsyldliu commented on code in PR #24975: URL: https://github.com/apache/flink/pull/24975#discussion_r1660970466 ## docs/content/docs/dev/table/materialized-table/syntax.md: ## @@ -0,0 +1,337 @@ +--- +title: Syntax Review Comment: Statements ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html +--- + + +# Quick Start Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment, creating materialized tables in CONTINUOUS mode, and creating materialized tables in FULL mode. Review Comment: This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment and creating, altering, and dropping materialized tables in CONTINUOUS and FULL mode. ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html Review Comment: quick-start -> quickstart ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html +--- + + +# Quick Start Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment, creating materialized tables in CONTINUOUS mode, and creating materialized tables in FULL mode. + +## Environment Setup + +### Directory Preparation + +**Replace the example paths below with real paths on your machine.** + +- Create directories for Catalog Store and Catalog dependencies: + +``` +# Directory for File Catalog Store to save catalog information +mkdir -p /path/to/catalog/store + +# Directory for test-filesystem Catalog to save table metadata and table data +mkdir -p /path/to/catalog/test-filesystem + +# Default database for test-filesystem Catalog +mkdir -p /path/to/catalog/test-filesystem/mydb +``` + +- Create directories for Checkpoints and Savepoints to save Checkpoints and Savepoints respectively: + +``` +mkdir -p /path/to/checkpoint + +mkdir -p /path/to/savepoint +``` + +### Dependency Preparation + +The method here is similar to the steps recorded in [local installation]({{< ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need to have __Java 11__ installed locally. You can check the installed Java version with the following command: + +``` +java -version +``` + +Next, [download](https://flink.apache.org/downloads/) the latest Flink binary package and extract it: + +``` +tar -xzf flink-*.tgz +``` + +Download the [test-filesystem](https://https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-filesystem-test-utils/) connector and place it in the lib directory. + +``` +cp flink-table-filesystem-test-utils-{VERSION}.jar flink-*/lib/ +``` + +### Configuration Preparation + +Edit the config.yaml file and add the following configurations: + +```yaml +execution: + checkpoints: +dir: file:///path/to/savepoint + +# Configure file catalog +table: + catalog-store: +kind: file +file: + path: /path/to/catalog/store + +# Configure embedded scheduler +workflow-scheduler: + type: embedded + +# Configure SQL gateway address and port +sql-gateway: + endpoint: +rest: + address: 127.0.0.1 + port: 8083 +``` + +### Start Flink Cluster + +Run the following script to start the cluster locally: + +``` +./bin/start-cluster.sh +``` + +### Start SQL Gateway + +Run the following script to start the SQL Gateway locally: + +``` +./sql-gateway.sh start +``` + +### Start SQL Client + +Run the following script to start the SQL Client locally: + +``` +./sql-client.sh gateway --endpoint http://127.0.0.1:8083 Review Comment: ./bin/ ## docs/content/docs/dev/table/materialized-table/quick-start.md: ## @@ -0,0 +1,333 @@ +--- +title: Quick Start +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quick-start.html +--- + + +# Quick Start Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment, creating materialized tables in CONTINUOUS mode, and creating materialized tables in FULL mode. + +## Environment Setup + +### Directory Preparation + +**Replace the example paths below with real paths on your machine.** + +- Create directories for Catalog Store and Catalog dependencies: + +``` +# Directory for File Catalog Store to save catalog information +mkdir -p /path/to/catalog/store + +# Directory for test-filesystem Catalog to save table metadata and table data +mkdir -p /path/to/catalog/
Re: [PR] [hotfix][parquet] fix dependences and variable name problems in FLINK-35702 [flink]
flinkbot commented on PR #25004: URL: https://github.com/apache/flink/pull/25004#issuecomment-2200079714 ## CI report: * 9fba47c2d98e83aeebc51347847f14ee7f067445 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][parquet] fix dependences and variable name problems in FLINK-35702 [flink]
Stephen0421 opened a new pull request, #25004: URL: https://github.com/apache/flink/pull/25004 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - fix parquet flink-runtime dependency version - optimize put null flags logics - relocation exclude three flink-runtime class. ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is already covered by existing tests, such as org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReaderTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode. [flink]
lsyldliu merged PR #25002: URL: https://github.com/apache/flink/pull/25002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]
Stephen0421 commented on PR #24991: URL: https://github.com/apache/flink/pull/24991#issuecomment-2197896793 > @Stephen0421, could you, please, look at this issue https://issues.apache.org/jira/projects/FLINK/issues/FLINK-35698 It seems to be connected to your work. There is a problem Parquet reading nested data when field type is DECIMAL. I provided a test case there. Hello @empathy87, I run your test case in my local pr branch and it run successfully, I think the pr has solve the problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]
Stephen0421 commented on code in PR #24991: URL: https://github.com/apache/flink/pull/24991#discussion_r1659515022 ## flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java: ## @@ -578,4 +569,167 @@ public static WritableColumnVector createWritableColumnVector( throw new UnsupportedOperationException(fieldType + " is not supported now."); } } + +public static List buildFieldsList( +List childrens, List fieldNames, MessageColumnIO columnIO) { +List list = new ArrayList<>(); +for (int i = 0; i < childrens.size(); i++) { +list.add( +constructField( +childrens.get(i), lookupColumnByName(columnIO, fieldNames.get(i; +} +return list; +} + +private static ParquetField constructField(RowType.RowField rowField, ColumnIO columnIO) { +boolean required = columnIO.getType().getRepetition() == REQUIRED; +int repetitionLevel = columnIO.getRepetitionLevel(); +int definitionLevel = columnIO.getDefinitionLevel(); +LogicalType type = rowField.getType(); +String filedName = rowField.getName(); +if (type instanceof RowType) { +GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; +RowType rowType = (RowType) type; +ImmutableList.Builder fieldsBuilder = ImmutableList.builder(); +List fieldNames = rowType.getFieldNames(); +List childrens = rowType.getFields(); +for (int i = 0; i < childrens.size(); i++) { +fieldsBuilder.add( +constructField( +childrens.get(i), +lookupColumnByName(groupColumnIO, fieldNames.get(i; +} + +return new ParquetGroupField( +type, repetitionLevel, definitionLevel, required, fieldsBuilder.build()); +} + +if (type instanceof MapType) { +GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; +GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO); +MapType mapType = (MapType) type; +ParquetField keyField = +constructField( +new RowType.RowField("", mapType.getKeyType()), +keyValueColumnIO.getChild(0)); +ParquetField valueField = +constructField( +new RowType.RowField("", mapType.getValueType()), +keyValueColumnIO.getChild(1)); +return new ParquetGroupField( +type, +repetitionLevel, +definitionLevel, +required, +ImmutableList.of(keyField, valueField)); +} + +if (type instanceof MultisetType) { +GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; +GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO); +MultisetType multisetType = (MultisetType) type; +ParquetField keyField = +constructField( +new RowType.RowField("", multisetType.getElementType()), +keyValueColumnIO.getChild(0)); +ParquetField valueField = +constructField( +new RowType.RowField("", new IntType()), keyValueColumnIO.getChild(1)); +return new ParquetGroupField( +type, +repetitionLevel, +definitionLevel, +required, +ImmutableList.of(keyField, valueField)); +} + +if (type instanceof ArrayType) { +ArrayType arrayType = (ArrayType) type; +ColumnIO elementTypeColumnIO; +if (columnIO instanceof GroupColumnIO) { +GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO; +if (!StringUtils.isNullOrWhitespaceOnly(filedName)) { +while (!Objects.equals(groupColumnIO.getName(), filedName)) { +groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0); +} +elementTypeColumnIO = groupColumnIO; +} else { +if (arrayType.getElementType() instanceof RowType) { +elementTypeColumnIO = groupColumnIO; +} else { +elementTypeColumnIO = groupColumnIO.getChild(0); +} +} +} else if (columnIO instanceof PrimitiveColumnIO) { +elementTypeColumnIO = columnIO; +} else { +throw new RuntimeException(String.format("Unkown ColumnIO, %s", columnIO)); +
Re: [PR] [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config [flink-kubernetes-operator]
1996fanrui commented on code in PR #845: URL: https://github.com/apache/flink-kubernetes-operator/pull/845#discussion_r1659491268 ## docs/layouts/shortcodes/generated/dynamic_section.html: ## @@ -182,5 +182,11 @@ Map Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2. + + kubernetes.operator.plugins.listeners..class Review Comment: Actually, these html files are generated instead of changing manually. As I understand, you need to update the `KubernetesOperatorConfigOptions` and run `mvn clean install -DskipTests -Pgenerate-docs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]
yuxiqian commented on PR #3430: URL: https://github.com/apache/flink-cdc/pull/3430#issuecomment-2197864754 Thanks for @morazow's comments! Addressed most of them, and left several requires further discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document
[ https://issues.apache.org/jira/browse/FLINK-35357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35357: --- Labels: pull-request-available (was: ) > Add "kubernetes.operator.plugins.listeners" parameter description to the > Operator configuration document > > > Key: FLINK-35357 > URL: https://issues.apache.org/jira/browse/FLINK-35357 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: Yang Zhou >Assignee: Yang Zhou >Priority: Minor > Labels: pull-request-available > > In Flink Operator "Custom Flink Resource Listeners" in practice (doc: > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource] > -listeners) > It was found that the "Operator Configuration Reference" document did not > explain the "Custom Flink Resource Listeners" configuration parameters. > So I wanted to come up with adding: > kubernetes.operator.plugins.listeners..class: > > , after all it is useful. > I want to submit a PR to optimize the document. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35654) Add Flink CDC verification guide in docs
[ https://issues.apache.org/jira/browse/FLINK-35654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35654: --- Labels: pull-request-available (was: ) > Add Flink CDC verification guide in docs > > > Key: FLINK-35654 > URL: https://issues.apache.org/jira/browse/FLINK-35654 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, ASF voting process requires vast quality verification before > releasing any new versions, including: > * Tarball checksum verification > * Compile from source code > * Run pipeline E2e tests > * Run migration tests > * Check if jar was packaged with correct JDK version > * ... > Adding verification guide in Flink CDC docs should help developers verify > future releases more easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]
yuxiqian commented on code in PR #3430: URL: https://github.com/apache/flink-cdc/pull/3430#discussion_r1659427878 ## docs/content/docs/developer-guide/contribute-to-flink-cdc.md: ## @@ -82,3 +83,56 @@ not need a long description. 3. Are the Documentation Updated? If the pull request introduces a new feature, the feature should be documented. + +Release Verification Guide + +We will prepare for new releases of Flink CDC regularly. + +According to the Apache Software Foundation releasing SOP, +we will make a release candidate version before each release, +and invite community members to test and vote on this pre-release version. + +Everyone is welcomed to participate in the version verification work in `d...@flink.apache.org` mailing list. +The verification content may include the following aspects: + +1. Verify if source code could be compiled successfully. + +Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build tool and compiles on the JDK 8 platform. Review Comment: Well it seems a little arbitrary. I could notice that CI uses Maven 3.8.6 but latest 3.1.1 release was compiled with 3.9.1. Not sure if all Maven 3+ works... ## docs/content/docs/developer-guide/contribute-to-flink-cdc.md: ## @@ -82,3 +83,56 @@ not need a long description. 3. Are the Documentation Updated? If the pull request introduces a new feature, the feature should be documented. + +Release Verification Guide + +We will prepare for new releases of Flink CDC regularly. + +According to the Apache Software Foundation releasing SOP, +we will make a release candidate version before each release, +and invite community members to test and vote on this pre-release version. + +Everyone is welcomed to participate in the version verification work in `d...@flink.apache.org` mailing list. +The verification content may include the following aspects: + +1. Verify if source code could be compiled successfully. + +Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build tool and compiles on the JDK 8 platform. Review Comment: Well seems Maven version isn't always consistent. I noticed that CI uses Maven 3.8.6 but latest 3.1.1 release was compiled with 3.9.1. Not sure if all Maven 3+ works. ## docs/content/docs/developer-guide/contribute-to-flink-cdc.md: ## @@ -82,3 +83,56 @@ not need a long description. 3. Are the Documentation Updated? If the pull request introduces a new feature, the feature should be documented. + +Release Verification Guide + +We will prepare for new releases of Flink CDC regularly. + +According to the Apache Software Foundation releasing SOP, +we will make a release candidate version before each release, +and invite community members to test and vote on this pre-release version. + +Everyone is welcomed to participate in the version verification work in `d...@flink.apache.org` mailing list. +The verification content may include the following aspects: + +1. Verify if source code could be compiled successfully. + +Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build tool and compiles on the JDK 8 platform. +You can download the RC version of the source code package and compile it using the `mvn clean package -Dfast` command, +and check if there's any unexpected errors or warnings. + +2. Verify if tarball checksum matches. + +To ensure the genuinity and integrity of released binary packages, a SHA512 hash value of the corresponding file is attached to any released binary tarball so that users can verify the integrity. +You can download the binary tarball of the RC version and calculate its SHA512 hash value with the following command: + +* Linux: `sha512sum flink-cdc-*-bin.tar.gz` +* macOS: `shasum -a 512 flink-cdc-*-bin.tar.gz` +* Windows (PowerShell): `Get-FileHash flink-cdc-*-bin.tar.gz -Algorithm SHA512 | Format-List` + +3. Verify that the binary package was compiled with JDK 8. + +Unpack the precompiled binary jar package and check if the `Build-Jdk` entry in the `META-INF\MANIFEST.MF` file is correct. + +4. Run migration tests. + +Flink CDC tries to ensure backward compatibility of state, that is, the job state (Checkpoint/Savepoint) saved with previous CDC version should be usable in the new version. +You can run CDC migration verification locally with [Flink CDC Migration Test Utils](https://github.com/yuxiqian/migration-test) script. + +* [Pipeline Job Migration Test Guide](https://github.com/yuxiqian/migration-test/blob/main/README.md) +* [DataStream Job Migration Test Guide](https://github.com/yuxiqian/migration-test/blob/main/datastream/README.md) Review Comment: I would prefer putting it in CDC repository along with existing CI workflows since it isn't significant enough to get an independent repo. But before all I need to clean up scripts before it could be accepted by any upstream repositories. ## docs/content/docs/developer-guide/contribute-to
Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]
yuxiqian commented on code in PR #3430: URL: https://github.com/apache/flink-cdc/pull/3430#discussion_r1659416389 ## README.md: ## @@ -27,7 +27,15 @@ full database synchronization, sharding table synchronization, schema evolution ![Flink CDC framework desigin](docs/static/fig/architecture.png) +### Installation +* Flink CDC tar could be downloaded from [Apache Flink Website](https://flink.apache.org/downloads/#apache-flink-cdc) or [GitHub Release Page](https://github.com/apache/flink-cdc/releases). +* Pipeline and source connectors could be downloaded from [Maven Central Repository](https://mvnrepository.com/artifact/org.apache.flink) or [GitHub Release Page](https://github.com/apache/flink-cdc/releases). +* If you're using Linux or macOS, you may install Flink CDC and connectors with Homebrew: Review Comment: Homebrew is also available on Linux (rarely used though). Currently, Flink CDC isn't provided in any other package managers. I could remove Linux here if it's confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]
jeyhunkarimov opened a new pull request, #24995: URL: https://github.com/apache/flink/pull/24995 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]
flinkbot commented on PR #24995: URL: https://github.com/apache/flink/pull/24995#issuecomment-2197770924 ## CI report: * b739e000f1ad7e9ce6f1dcbb3fbd2bfb27016083 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]
jeyhunkarimov closed pull request #24995: [WIP] [FLINK-] [core] WIP Watermarking URL: https://github.com/apache/flink/pull/24995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35702) Support Parquet Nested Read
[ https://issues.apache.org/jira/browse/FLINK-35702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35702: --- Labels: pull-request-available (was: ) > Support Parquet Nested Read > --- > > Key: FLINK-35702 > URL: https://issues.apache.org/jira/browse/FLINK-35702 > Project: Flink > Issue Type: New Feature > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Wenchao Wu >Assignee: Wenchao Wu >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Now flink parquet doesn’t support read nested columns such as Array, > Array,Array in vectorized. This feature is aimed to solve this > problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35553][runtime] Wires up the RescaleManager with the CheckpointLifecycleListener interface [flink]
XComp commented on PR #24912: URL: https://github.com/apache/flink/pull/24912#issuecomment-2197023625 I addressed the last two comments and rebased the branch to most-recent version of parent PR #24911 . That way we also have the CI debug commit included -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]
empathy87 commented on PR #24991: URL: https://github.com/apache/flink/pull/24991#issuecomment-2197002034 @Stephen0421, could you, please, look at this issue https://issues.apache.org/jira/projects/FLINK/issues/FLINK-35698 It seems to be connected to your work. There is a problem Parquet reading nested data when field type is DECIMAL. I provided a test case there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]
ztison commented on code in PR #24911: URL: https://github.com/apache/flink/pull/24911#discussion_r1657013866 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java: ## @@ -214,402 +60,49 @@ void reportRestoredCheckpoint(RestoredCheckpointStats restored) { restored.getStateSize()); } -public void reportRestoredCheckpoint( +void reportRestoredCheckpoint( long checkpointID, CheckpointProperties properties, String externalPath, -long stateSize) { -statsReadWriteLock.lock(); -try { -counts.incrementRestoredCheckpoints(); -checkState( -jobInitializationMetricsBuilder.isPresent(), -"JobInitializationMetrics should have been set first, before RestoredCheckpointStats"); -jobInitializationMetricsBuilder -.get() -.setRestoredCheckpointStats(checkpointID, stateSize, properties, externalPath); -dirty = true; -} finally { -statsReadWriteLock.unlock(); -} -} +long stateSize); /** * Callback when a checkpoint completes. * * @param completed The completed checkpoint stats. */ -void reportCompletedCheckpoint(CompletedCheckpointStats completed) { -statsReadWriteLock.lock(); -try { -latestCompletedCheckpoint = completed; +void reportCompletedCheckpoint(CompletedCheckpointStats completed); -counts.incrementCompletedCheckpoints(); -history.replacePendingCheckpointById(completed); +PendingCheckpointStats getPendingCheckpointStats(long checkpointId); -summary.updateSummary(completed); +void reportIncompleteStats( +long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics metrics); -dirty = true; -logCheckpointStatistics(completed); -} finally { -statsReadWriteLock.unlock(); -} -} +void reportInitializationStarted( +Set toInitialize, long initializationStartTs); -/** - * Callback when a checkpoint fails. - * - * @param failed The failed checkpoint stats. - */ -void reportFailedCheckpoint(FailedCheckpointStats failed) { -statsReadWriteLock.lock(); -try { -counts.incrementFailedCheckpoints(); -history.replacePendingCheckpointById(failed); - -dirty = true; -logCheckpointStatistics(failed); -} finally { -statsReadWriteLock.unlock(); -} -} - -private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { -try { -metricGroup.addSpan( -Span.builder(CheckpointStatsTracker.class, "Checkpoint") - .setStartTsMillis(checkpointStats.getTriggerTimestamp()) - .setEndTsMillis(checkpointStats.getLatestAckTimestamp()) -.setAttribute("checkpointId", checkpointStats.getCheckpointId()) -.setAttribute("fullSize", checkpointStats.getStateSize()) -.setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()) -.setAttribute("checkpointStatus", checkpointStats.getStatus().name())); -if (LOG.isDebugEnabled()) { -StringWriter sw = new StringWriter(); -MAPPER.writeValue( -sw, - CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true)); -String jsonDump = sw.toString(); -LOG.debug( -"CheckpointStatistics (for jobID={}, checkpointId={}) dump = {} ", -metricGroup.jobId(), -checkpointStats.checkpointId, -jsonDump); -} -} catch (Exception ex) { -LOG.warn("Fail to log CheckpointStatistics", ex); -} -} - -/** - * Callback when a checkpoint failure without in progress checkpoint. For example, it should be - * callback when triggering checkpoint failure before creating PendingCheckpoint. - */ -public void reportFailedCheckpointsWithoutInProgress() { -statsReadWriteLock.lock(); -try { -counts.incrementFailedCheckpointsWithoutInProgress(); - -dirty = true; -} finally { -statsReadWriteLock.unlock(); -} -} - -public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) { -statsReadWriteLock.lock(); -try { -AbstractCheckpointStats stats = history.getCheckpointById(checkpointId); -return stats instanceof PendingCheckpointStats ? (PendingCheckpointStat
Re: [PR] [FLINK-35551][runtime] Introduces RescaleManager#onTrigger endpoint [flink]
XComp commented on PR #24910: URL: https://github.com/apache/flink/pull/24910#issuecomment-2194727428 [CI failure](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60525&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=10037) due to [FLINK-30719](https://issues.apache.org/jira/browse/FLINK-30719) ...again 🙄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35551][runtime] Introduces RescaleManager#onTrigger endpoint [flink]
XComp commented on PR #24910: URL: https://github.com/apache/flink/pull/24910#issuecomment-2194727822 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Flink 33386 review splited commits for ci [flink]
RocMarshal commented on PR #24229: URL: https://github.com/apache/flink/pull/24229#issuecomment-2194703794 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]
loserwang1024 commented on code in PR #3319: URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1657064426 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java: ## @@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) { * @param splitColumn dbz split column. * @return flink data type */ -DataType fromDbzColumn(Column splitColumn); +protected abstract DataType fromDbzColumn(Column splitColumn); + +/** Returns the distribution factor of the given table. */ +protected double calculateDistributionFactor( +TableId tableId, Object min, Object max, long approximateRowCnt) { + +if (!min.getClass().equals(max.getClass())) { +throw new IllegalStateException( +String.format( +"Unsupported operation type, the MIN value type %s is different with MAX value type %s.", +min.getClass().getSimpleName(), max.getClass().getSimpleName())); +} +if (approximateRowCnt == 0) { +return Double.MAX_VALUE; +} +BigDecimal difference = ObjectUtils.minus(max, min); +// factor = (max - min + 1) / rowCount +final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1)); +double distributionFactor = +subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue(); +LOG.info( +"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}", +tableId, +distributionFactor, +min, +max, +approximateRowCnt); +return distributionFactor; +} + +/** + * Get the column which is seen as chunk key. + * + * @param table table identity. + * @param chunkKeyColumn column name which is seen as chunk key, if chunkKeyColumn is null, use + * primary key instead. @Column the column which is seen as chunk key. + */ +protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) { +return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn); +} + +/** ChunkEnd less than or equal to max. */ +protected boolean isChunkEndLeMax(Object chunkEnd, Object max) { +return ObjectUtils.compare(chunkEnd, max) <= 0; +} + +/** ChunkEnd greater than or equal to max. */ +protected boolean isChunkEndGeMax(Object chunkEnd, Object max) { +return ObjectUtils.compare(chunkEnd, max) >= 0; +} /** * convert dbz column to Flink row type. * * @param splitColumn split column. * @return flink row type. */ -default RowType getSplitType(Column splitColumn) { +private RowType getSplitType(Column splitColumn) { return (RowType) ROW(FIELD(splitColumn.name(), fromDbzColumn(splitColumn))).getLogicalType(); } + +/** + * We can use evenly-sized chunks or unevenly-sized chunks when split table into chunks, using + * evenly-sized chunks which is much efficient, using unevenly-sized chunks which will request + * many queries and is not efficient. + */ +private List splitTableIntoChunks( +JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { +final String splitColumnName = splitColumn.name(); +final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, splitColumnName); +final Object min = minMax[0]; +final Object max = minMax[1]; +if (min == null || max == null || min.equals(max)) { +// empty table, or only one row, return full table scan as a chunk +return Collections.singletonList(ChunkRange.all()); +} + +final int chunkSize = sourceConfig.getSplitSize(); +final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); +final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); + +if (isEvenlySplitColumn(splitColumn)) { +long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId); +double distributionFactor = +calculateDistributionFactor(tableId, min, max, approximateRowCnt); + +boolean dataIsEvenlyDistributed = +doubleCompare(distributionFactor, distributionFactorLower) >= 0 +&& doubleCompare(distributionFactor, distributionFactorUpper) <= 0; + +if (dataIsEvenlyDistributed) { +// the minimum dynamic chunk size is at least 1 +final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); +return splitEvenlySizedChunks( +
Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]
gliter commented on PR #24986: URL: https://github.com/apache/flink/pull/24986#issuecomment-2194649007 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35704) ForkJoinPool introduction to NonSplittingRecursiveEnumerator to vastly improve enumeration performance
[ https://issues.apache.org/jira/browse/FLINK-35704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35704: --- Labels: pull-request-available (was: ) > ForkJoinPool introduction to NonSplittingRecursiveEnumerator to vastly > improve enumeration performance > -- > > Key: FLINK-35704 > URL: https://issues.apache.org/jira/browse/FLINK-35704 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Grzegorz Liter >Priority: Minor > Labels: pull-request-available > Attachments: ParallelNonSplittingRecursiveEnumerator.java > > > In current implementation of NonSplittingRecursiveEnumerator the files and > directories are enumerated in sequence. In case of accessing a remote storage > like S3 the vast amount of time is wasted waiting for a response. > What is worse the enumeration is done by JM it self during which it is > unresponsive for RPC calls. When accessing multiple (thousands+) files the > wait time can quickly add up and can cause a pekko timeout. > The performance can be improved by enumerating files in parallel with e.g. > ForkJoinPool and parallel streams. I am attaching example implementation that > I am happy to contribute to Flink repository. > In my tests it cuts the time at least 10x -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]
gliter opened a new pull request, #24986: URL: https://github.com/apache/flink/pull/24986 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]
flinkbot commented on PR #24986: URL: https://github.com/apache/flink/pull/24986#issuecomment-2191696248 ## CI report: * 5cc83326c8958e4795b9d77d11d0c2793ad7366c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]
gliter commented on PR #24986: URL: https://github.com/apache/flink/pull/24986#issuecomment-2191702558 I would prefer the code be structured a bit differently, mainly by not creating ArrayList just to be passed as parameter and return stream instead in `addSplitsForPath` but it have to be like this to preserver definition of this method that is kind of public API for extending of `NonSplittingRecursiveEnumerator` which few classes do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org