[PR] [FLINK-33211][table] support flink table lineage [flink]

2024-07-02 Thread via GitHub


HuangZhenQiu opened a new pull request, #25012:
URL: https://github.com/apache/flink/pull/25012

   ## What is the purpose of the change
   1. Add Table Lineage Vertex into transformation in planner. The final 
LineageGraph is generated from transformation and put into StreamGraph. The 
lineage graph will be published to Lineage Listener in follow up PR.
   2. Deprecated table source and sink are not considered as no enough info can 
be used for name and namespace for lineage dataset.
   
   ## Brief change log
 - add table lineage interface and default implementations
 - create lineage vertex and add them to transformation in the phase of 
physical plan to transformation conversion. 
   
   ## Verifying this change
   1. Add TableLineageGraphTest for both stream and batch.
   2. Added LineageGraph verification in TransformationsTest for legacy sources.

   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35697) Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink

2024-07-02 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862672#comment-17862672
 ] 

Muhammet Orazov commented on FLINK-35697:
-

Hey [~chalixar],

Thanks for the update!

I'll look into it, and try to finalise next days

> Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink
> -
>
> Key: FLINK-35697
> URL: https://issues.apache.org/jira/browse/FLINK-35697
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Ahmed Hamdy
>Priority: Blocker
> Fix For: 1.20.0
>
>
> h2. Description
> In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with 
> default value of 10 minutes and default failOnTimeout to false. 
> We need to test the new feature on different levels
> - Functional Testing
> - Performance Testing
> - Regression Testing
> h2. Common Utils
> The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need 
> to use an implementation sink for our tests, Any implementation where we can 
> track delivery of elements is accepted in our tests, an example is:
> {code}
> class DiscardingElementWriter extends AsyncSinkWriter {
> SeparateThreadExecutor executor =
> new SeparateThreadExecutor(r -> new Thread(r, 
> "DiscardingElementWriter"));
> public DiscardingElementWriter(
> Sink.InitContext context,
> AsyncSinkWriterConfiguration configuration,
> Collection> 
> bufferedRequestStates) {
> super(
> (element, context1) -> element.toString(),
> context,
> configuration,
> bufferedRequestStates);
> }
> @Override
> protected long getSizeInBytes(String requestEntry) {
> return requestEntry.length();
> }
> @Override
> protected void submitRequestEntries(
> List requestEntries, ResultHandler 
> resultHandler) {
> executor.execute(
> () -> {
> long delayMillis = new Random().nextInt(5000);
> try {
> Thread.sleep(delayMillis);
> } catch (InterruptedException ignored) {
> }
> for (String entry : requestEntries) {
> LOG.info("Discarding {} after {} ms", entry, 
> delayMillis);
> }
> resultHandler.complete();
> });
> }
> }
> {code}
> We will also need a simple Flink Job that writes data using the sink
> {code}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
> env.setParallelism(1);
> env.fromSequence(0, 100)
> .map(Object::toString)
> .sinkTo(new DiscardingTestAsyncSink<>());
> {code}
> We can use least values for batch size and inflight requests to increase 
> number of requests that are subject to timeout
> {code}
> public class DiscardingTestAsyncSink extends AsyncSinkBase {
> private static final Logger LOG = 
> LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
> public DiscardingTestAsyncSink(long requestTimeoutMS, boolean 
> failOnTimeout) {
> super(
> (element, context) -> element.toString(),
> 1, // maxBatchSize
> 1, // maxInflightRequests
> 10, // maxBufferedRequests
> 1000L, // maxBatchsize
> 100, // MaxTimeInBuffer
> 500L, // maxRecordSize
> requestTimeoutMS,
> failOnTimeout);
> }
> @Override
> public SinkWriter createWriter(WriterInitContext context) throws 
> IOException {
> return new DiscardingElementWriter(
> new InitContextWrapper(context),
> AsyncSinkWriterConfiguration.builder()
> .setMaxBatchSize(this.getMaxBatchSize())
> .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
> .setMaxInFlightRequests(this.getMaxInFlightRequests())
> .setMaxBufferedRequests(this.getMaxBufferedRequests())
> .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
> 
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
> .setFailOnTimeout(this.getFailOnTimeout())
> .setRequestTimeoutMS(this.getRequestTimeoutMS())
> .build(),
> Collections.emptyList());
>  

[jira] [Created] (FLINK-35747) customer ‘rest.bind-address' config overwrite by code

2024-07-02 Thread dncba (Jira)
dncba created FLINK-35747:
-

 Summary:  customer ‘rest.bind-address' config overwrite by code
 Key: FLINK-35747
 URL: https://issues.apache.org/jira/browse/FLINK-35747
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.19.1
Reporter: dncba


When I want flink on Yarn webui  bind on 0.0.0.0 to listen Ipv4 & Ipv6 double 
stack, I found the   ‘rest.bind-address'  config will auto overwrite by here
{code:java}
package org.apache.flink.yarn.entrypoint;



public class YarnEntrypointUtils {


public static Configuration loadConfiguration(

final Configuration configuration =
GlobalConfiguration.loadConfiguration(workingDirectory, 
dynamicParameters);

        final String hostname 
=env.get(ApplicationConstants.Environment.NM_HOST.key());
       configuration.set(JobManagerOptions.ADDRESS, hostname);
        configuration.set(RestOptions.ADDRESS, hostname);

   # overwrite hostname by code
        configuration.set(RestOptions.BIND_ADDRESS, hostname);

`
}
}
{code}
In most case the are right.  when user want config the ‘rest.bind-address' by 
slef , the customer config will be auto overwirte.

 

the best way is check the user config before the ovewrite. like this

 
{code:java}


public class YarnEntrypointUtils {


public static Configuration loadConfiguration(

final Configuration configuration =
GlobalConfiguration.loadConfiguration(workingDirectory, 
dynamicParameters);


        final String hostname 
=env.get(ApplicationConstants.Environment.NM_HOST.key());
       configuration.set(JobManagerOptions.ADDRESS, hostname);
        configuration.set(RestOptions.ADDRESS, hostname);

   # check before the overwrite
String bindAddress = configuration.getString(RestOptions.BIND_ADDRESS);
if (StringUtils.isBlank(bindAddress)) {
configuration.setString(RestOptions.BIND_ADDRESS, hostname);
}
`
}
}

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow

2024-07-02 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862670#comment-17862670
 ] 

Muhammet Orazov commented on FLINK-34487:
-

Hey [~mapohl] ,

 

Sure I will create similar PR for other branches (fingers crossed)

> Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly 
> workflow
> -
>
> Key: FLINK-34487
> URL: https://issues.apache.org/jira/browse/FLINK-34487
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: github-actions, pull-request-available
> Fix For: 2.0.0
>
>
> Analogously to the [Azure Pipelines nightly 
> config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183]
>  we want to generate the wheels artifacts in the GHA nightly workflow as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34487][ci] Adds Python Wheels nightly GHA workflow [flink]

2024-07-02 Thread via GitHub


morazow commented on PR #24426:
URL: https://github.com/apache/flink/pull/24426#issuecomment-2205212666

   Great, thanks @XComp, @HuangXingBo for the help 👍 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33211) Implement table lineage graph

2024-07-02 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862668#comment-17862668
 ] 

Matthias Pohl edited comment on FLINK-33211 at 7/3/24 6:40 AM:
---

I revert 
[960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba]
 due to the [compilation 
failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598]
 in {{master}} build 20240703.5 to unblock {{master}} again.

I decided against just fixing it because it appears to be a conflict resolution 
issue. Please go ahead and create another PR and make sure that there are not 
other hidden conflicts introduced with this change. Sorry for the extra work.

Additionally, please be transparent about the fixed version and commit 
information in the Jira issues: It's common practice to update the Fix Version 
in the Jira issues and add the commit hash and branch as part of the resolution 
comment.


was (Author: mapohl):
I revert 
[960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba]
 due to the [compilation 
failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598]
 in {{master}} build 20240703.5.

I decided against just fixing it because it appears to be a conflict resolution 
issue. Please go ahead and create another PR and make sure that there are not 
other hidden conflicts introduced with this change. Sorry for the extra work.

Additionally, please be transparent about the fixed version and commit 
information in the Jira issues: It's common practice to update the Fix Version 
in the Jira issues and add the commit hash and branch as part of the resolution 
comment.

> Implement table lineage graph
> -
>
> Key: FLINK-33211
> URL: https://issues.apache.org/jira/browse/FLINK-33211
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Implement table lineage graph



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33211) Implement table lineage graph

2024-07-02 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reopened FLINK-33211:
---

I revert 
[960363cd3c6c82f7e56ef781295756105f7b5eba|https://github.com/apache/flink/commit/960363cd3c6c82f7e56ef781295756105f7b5eba]
 due to the [compilation 
failures|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60624&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=6598]
 in {{master}} build 20240703.5.

I decided against just fixing it because it appears to be a conflict resolution 
issue. Please go ahead and create another PR and make sure that there are not 
other hidden conflicts introduced with this change. Sorry for the extra work.

Additionally, please be transparent about the fixed version and commit 
information in the Jira issues: It's common practice to update the Fix Version 
in the Jira issues and add the commit hash and branch as part of the resolution 
comment.

> Implement table lineage graph
> -
>
> Key: FLINK-33211
> URL: https://issues.apache.org/jira/browse/FLINK-33211
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Implement table lineage graph



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35746) Add logic to override observed config based on settings observed through REST API

2024-07-02 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-35746:
--

 Summary: Add logic to override observed config based on settings 
observed through REST API
 Key: FLINK-35746
 URL: https://issues.apache.org/jira/browse/FLINK-35746
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.10.0


In many cases the Flink operator relies on the user configuration to infer 
running application/cluster settings. While this is mostly fine, many of these 
configs can be programmatically changed by the user in their app main method 
that will ultimately take precedence and can lead to inconsistent behaviour 
from the operators side.

To alleviate this we need to add logic to override parts of the observed config 
based on what we see from the cluster. Such as checkpointing enabled / 
disabled, checkpoint intervals etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]

2024-07-02 Thread via GitHub


XComp commented on PR #24911:
URL: https://github.com/apache/flink/pull/24911#issuecomment-2205155149

   [CI with AdaptiveScheduler 
enabled](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=60616)
 was successful. I'm gonna go ahead and prepare this PR to be merged (i.e. 
remove the DO-NOT-MERG commit).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (FLINK-34487) Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly workflow

2024-07-02 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reopened FLINK-34487:
---

Ok, I was wrong with my assumption that we don't need backports. The Python 
wheels are created for 
[master|https://github.com/apache/flink/actions/runs/9770777281] but not for 
the other nightlies (e.g. 
[release-1.19|https://github.com/apache/flink/actions/runs/9770777344]).

[~m.orazow] Can you prepare backports for 1.20, 1.19 and 1.18?

> Integrate tools/azure-pipelines/build-python-wheels.yml into GHA nightly 
> workflow
> -
>
> Key: FLINK-34487
> URL: https://issues.apache.org/jira/browse/FLINK-34487
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: github-actions, pull-request-available
> Fix For: 2.0.0
>
>
> Analogously to the [Azure Pipelines nightly 
> config|https://github.com/apache/flink/blob/e923d4060b6dabe650a8950774d176d3e92437c2/tools/azure-pipelines/build-apache-repo.yml#L183]
>  we want to generate the wheels artifacts in the GHA nightly workflow as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]

2024-07-02 Thread via GitHub


leonardBang commented on code in PR #24787:
URL: https://github.com/apache/flink/pull/24787#discussion_r1663518072


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala:
##
@@ -455,7 +449,7 @@ class RexNodeToExpressionConverter(
 
   case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
 val v = literal.getValueAs(classOf[TimestampString])
-toLocalDateTime(v).atZone(timeZone.toZoneId).toInstant
+toLocalDateTime(v).atZone(ZoneId.of(ZoneOffset.UTC.getId)).toInstant

Review Comment:
   Thanks @lshangq for the information, evaluation once make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [cdc-connector][jdbc-db2]Flink cdc pipeline support db2 source [flink-cdc]

2024-07-02 Thread via GitHub


ChengJie1053 opened a new pull request, #3450:
URL: https://github.com/apache/flink-cdc/pull/3450

   Flink cdc pipeline support db2 source
   
   
![image](https://github.com/apache/flink-cdc/assets/125547374/69eadc27-6a90-4bf4-97e1-4a6746d14db8)
   
   
![image](https://github.com/apache/flink-cdc/assets/125547374/95ed055b-f2f4-4410-aad9-059870714c11)
   
   
![image](https://github.com/apache/flink-cdc/assets/125547374/ffbdc504-8675-40ad-8a17-2e1de4995a8f)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]

2024-07-02 Thread via GitHub


Myasuka commented on code in PR #24975:
URL: https://github.com/apache/flink/pull/24975#discussion_r1663472146


##
docs/content/docs/dev/table/materialized-table/statements.md:
##
@@ -0,0 +1,344 @@
+---
+title: Statements
+weight: 2
+type: docs
+aliases:
+- /dev/table/materialized-table/statements.html
+---
+
+
+# Description
+
+Flink SQL supports the following Materialized Table statements for now:
+- [CREATE MATERIALIZED TABLE](#create-materialized-table)
+- [Alter MATERIALIZED TABLE](#alter-materialized-table)
+- [DROP MATERIALIZED TABLE](#drop-materialized-table)
+
+# CREATE MATERIALIZED TABLE
+
+```
+CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
+
+[ ([  ]) ]
+
+[COMMENT table_comment]
+
+[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
+
+[WITH (key1=val1, key2=val2, ...)]
+
+FRESHNESS = INTERVAL '' { SECOND | MINUTE | HOUR | DAY }
+
+[REFRESH_MODE = { CONTINUOUS | FULL }]
+
+AS 
+
+:
+  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
+```
+
+### PRIMARY KEY
+
+PRIMARY KEY defines an optional list of columns that uniquely identifies each 
row within the table. The column as the primary key must be non-null.
+
+### PARTITIONED BY
+
+PARTITIONED BY define an optional list of columns to partition the 
materialized table. A directory is created for each partition if this 
materialized table is used as a filesystem sink.
+
+**Example:**
+
+```sql
+-- Create a materialized table and specify the partition field as `ds`.
+CREATE MATERIALIZED TABLE my_materialized_table
+PARTITIONED BY (ds)
+FRESHNESS = INTERVAL '1' HOUR
+AS SELECT 
+ds
+FROM
+...
+```
+
+Note
+- The partition column must be included in the query statement of the 
materialized table.
+
+### WITH Options
+
+WITH Options are used to specify the materialized table properties, including 
[connector options]({{< ref "docs/connectors/table/" >}}) and [time format 
option]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 
for partition fields.
+
+```sql
+-- Create a materialized table, specify the partition field as 'ds', and the 
corresponding time format as '-MM-dd'
+CREATE MATERIALIZED TABLE my_materialized_table
+PARTITIONED BY (ds)
+WITH (
+'format' = 'json',
+'partition.fields.ds.date-formatter' = '-MM-dd'
+)
+...
+```
+
+As shown in the above example, we specified the date-formatter option for the 
ds partition column. During each scheduling, the scheduling time will be 
converted to the ds partition value. For example, for a scheduling time of 
2024-01-01 00:00:00, only the partition ds = '2024-01-01' will be refreshed.
+
+Note
+- The `partition.fields.#.date-formatter` option only works in full mode.
+- The field in the [partition.fields.#.date-formatter]({{< ref 
"docs/dev/table/config" >}}#partition-fields-date-formatter) must be a valid 
string type partition field.
+
+### FRESHNESS
+
+**FRESHNESS Definition and Refresh Mode Relationship**
+
+FRESHNESS defines the maximum amount of time that the materialized table’s 
content should lag behind updates to the base tables. It does two things, 
firstly it determines the [refresh mode]({{< ref 
"docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the 
materialized table through [configuration]({{< ref "docs/dev/table/config" 
>}}#materialized-table-refresh-mode-freshness-threshold), followed by 
determines the data refresh frequency to meet the actual data freshness 
requirements.
+
+**Detailed Explanation of FRESHNESS Parameter**
+
+The FRESHNESS parameter range is INTERVAL `''` { SECOND | MINUTE | HOUR | 
DAY }. `''` must be a positive integer, and in FULL mode, `''` should 
be a common divisor of the respective time interval.
+
+**Examples:**
+(Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes)
+
+```sql
+-- The corresponding refresh pipeline is a streaming job with a checkpoint 
interval of 1 second
+FRESHNESS = INTERVAL '1' SECOND

Review Comment:
   Since current checkpoint interval is bounded to the settings of `freshness`, 
I think we should warn users that the stronger freshness would introduce more 
impact to the checkpoint, we can tune the freshness longer and tell users to 
consider [changelog 
state-backend](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#enabling-changelog).



##
docs/content/docs/dev/table/materialized-table/overview.md:
##
@@ -0,0 +1,65 @@
+---
+title: Overview
+weight: 1
+type: docs
+aliases:
+- /dev/table/materialized-table.html
+---
+
+
+# Introduction
+
+Materialized Table is a new table type introduced in Flink SQL, aimed at 
simplifying both batch and stream data pipelines, providing a consistent 
development experience. By specifying data freshness and query when creating 
Materialized, the engine automatically derives the schema for the materialized 
table and creates corresponding data ref

[jira] [Commented] (FLINK-35690) Release Testing: Verify FLIP-459: Support Flink hybrid shuffle integration with Apache Celeborn

2024-07-02 Thread xuhuang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862653#comment-17862653
 ] 

xuhuang commented on FLINK-35690:
-

Hi [~tanyuxin].

    I followed your steps to implement a basic workflow, applying Celeborn as a 
singer layer in Hybrid Shuffle. 

    My test results show that Flink can read, write and store Shuffle data 
through Celeborn, and the job is finally completed success.  :P

> Release Testing: Verify FLIP-459: Support Flink hybrid shuffle integration 
> with Apache Celeborn
> ---
>
> Key: FLINK-35690
> URL: https://issues.apache.org/jira/browse/FLINK-35690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yuxin Tan
>Assignee: xuhuang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
>
> Follow up the test for https://issues.apache.org/jira/browse/FLINK-35533
> In Flink 1.20,  we proposed integrating Flink's Hybrid Shuffle with Apache 
> Celeborn through a pluggable remote tier interface. To verify this feature, 
> you should reference these main two steps.
> 1. Implement Celeborn tier.
>  * Implement a new tier factory and tier for Celeborn, including these APIs, 
> including TierFactory/TierMasterAgent/TierProducerAgent/TierConsumerAgent.
>  * The implementations should support granular data management at the Segment 
> level for both client and server sides.
> 2. Use the implemented tier to shuffle data.
>  * Compile Flink and Celeborn.
>  * Deploy Celeborn service
>  ** Deploy a new Celeborn service with the new compiled packages. You can 
> reference the doc ([https://celeborn.apache.org/docs/latest/]) to deploy the 
> cluster.
>  * Add the compiled flink plugin jar (celeborn-client-flink-xxx.jar) to Flink 
> classpath.
>  * Configure the options to enable the feature.
>  ** Configure the option 
> taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class to the 
> new Celeborn tier classes. Except for this option, the following options 
> should also be added.
> {code:java}
> execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL 
> celeborn.master.endpoints: 
> celeborn.client.shuffle.partition.type: MAP{code}
>  * Run some test examples(e.g., WordCount) to verify the feature.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663450759


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * A transform projection processor contains:
+ *
+ * 
+ *   CreateTableEvent: add the user-defined computed columns into Schema.
+ *   SchemaChangeEvent: update the columns of TransformProjection.
+ *   DataChangeEvent: Fill data field to row in PreTransformOperator. 
Process the data column
+ *   and the user-defined expression computed columns.
+ * 
+ */
+public class PostTransformProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(PostTransformProcessor.class);
+private TableInfo tableInfo;
+private TableChangeInfo tableChangeInfo;
+private TransformProjection transformProjection;
+private @Nullable TransformFilter transformFilter;
+private String timezone;
+private Map 
projectionColumnProcessorMap;
+private List cachedProjectionColumns;
+
+public PostTransformProcessor(
+TableInfo tableInfo,
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+@Nullable TransformFilter transformFilter,
+String timezone) {
+this.tableInfo = tableInfo;
+this.tableChangeInfo = tableChangeInfo;
+this.transformProjection = transformProjection;
+this.transformFilter = transformFilter;
+this.timezone = timezone;
+this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
+this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, 
transformProjection);
+}
+
+public boolean hasTableChangeInfo() {
+return this.tableChangeInfo != null;
+}
+
+public boolean hasTableInfo() {
+return this.tableInfo != null;
+}
+
+public static PostTransformProcessor of(
+TableInfo tableInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter,
+String timezone) {
+return new PostTransformProcessor(
+tableInfo, null, transformProjection, transformFilter, 
timezone);
+}
+
+public static PostTransformProcessor of(
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter) {
+return new PostTransformProcessor(
+null, tableChangeInfo, transformProjection, transformFilter, 
null);
+}
+
+public static PostTransformProcessor of(
+TransformProjection transformProjection, TransformFilter 
transformFilter) {
+return new PostTransformProcessor(null, null, transformProjection, 
transformFilter, null);
+}
+
+public Schema processSchemaChangeEvent(Schema schema) {
+List projectionColumns =
+TransformParser.generateProjectionColumns(
+transformProjection.getProjection(), 
schema.getColumns());
+transformProjection.setProjectionColumns(projectionColumns);
+return schema.copy(
+projectionColumns.stream()
+.map(ProjectionColumn::getColumn)
+.collect(Collectors.toList()));

Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-02 Thread via GitHub


melin commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663449011


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * A transform projection processor contains:
+ *
+ * 
+ *   CreateTableEvent: add the user-defined computed columns into Schema.
+ *   SchemaChangeEvent: update the columns of TransformProjection.
+ *   DataChangeEvent: Fill data field to row in PreTransformOperator. 
Process the data column
+ *   and the user-defined expression computed columns.
+ * 
+ */
+public class PostTransformProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(PostTransformProcessor.class);
+private TableInfo tableInfo;
+private TableChangeInfo tableChangeInfo;
+private TransformProjection transformProjection;
+private @Nullable TransformFilter transformFilter;
+private String timezone;
+private Map 
projectionColumnProcessorMap;
+private List cachedProjectionColumns;
+
+public PostTransformProcessor(
+TableInfo tableInfo,
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+@Nullable TransformFilter transformFilter,
+String timezone) {
+this.tableInfo = tableInfo;
+this.tableChangeInfo = tableChangeInfo;
+this.transformProjection = transformProjection;
+this.transformFilter = transformFilter;
+this.timezone = timezone;
+this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
+this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, 
transformProjection);
+}
+
+public boolean hasTableChangeInfo() {
+return this.tableChangeInfo != null;
+}
+
+public boolean hasTableInfo() {
+return this.tableInfo != null;
+}
+
+public static PostTransformProcessor of(
+TableInfo tableInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter,
+String timezone) {
+return new PostTransformProcessor(
+tableInfo, null, transformProjection, transformFilter, 
timezone);
+}
+
+public static PostTransformProcessor of(
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter) {
+return new PostTransformProcessor(
+null, tableChangeInfo, transformProjection, transformFilter, 
null);
+}
+
+public static PostTransformProcessor of(
+TransformProjection transformProjection, TransformFilter 
transformFilter) {
+return new PostTransformProcessor(null, null, transformProjection, 
transformFilter, null);
+}
+
+public Schema processSchemaChangeEvent(Schema schema) {
+List projectionColumns =
+TransformParser.generateProjectionColumns(
+transformProjection.getProjection(), 
schema.getColumns());
+transformProjection.setProjectionColumns(projectionColumns);
+return schema.copy(
+projectionColumns.stream()
+.map(ProjectionColumn::getColumn)
+.collect(Collectors.toList()));
+  

[jira] [Resolved] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull

2024-07-02 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-35733.
-
Fix Version/s: 2.0.0
 Assignee: RocMarshal
   Resolution: Fixed

> Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
> 
>
> Key: FLINK-35733
> URL: https://issues.apache.org/jira/browse/FLINK-35733
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: RocMarshal
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull

2024-07-02 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862642#comment-17862642
 ] 

Rui Fan commented on FLINK-35733:
-

Merged to 2.0.0 via:
 * 1db333158b8d0d338e4d116d39c65466066ef0c7
 * b782acfb9c04a18d8adb8e6596bd22e0e5ebe5c3

> Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
> 
>
> Key: FLINK-35733
> URL: https://issues.apache.org/jira/browse/FLINK-35733
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35733) Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35733:
---
Labels: pull-request-available  (was: )

> Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull
> 
>
> Key: FLINK-35733
> URL: https://issues.apache.org/jira/browse/FLINK-35733
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35733] Change org.jetbrains.annotations.NotNull to javax.annotation.Nonnull [flink]

2024-07-02 Thread via GitHub


1996fanrui merged PR #25001:
URL: https://github.com/apache/flink/pull/25001


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35745) Add namespace convention doc for Flink lineage

2024-07-02 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-35745:
-

 Summary: Add namespace convention doc for Flink lineage 
 Key: FLINK-35745
 URL: https://issues.apache.org/jira/browse/FLINK-35745
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Zhenqiu Huang


We will recommend to follow the convention from openlineage.
https://openlineage.io/docs/spec/naming/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33211) Implement table lineage graph

2024-07-02 Thread Zhenqiu Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang closed FLINK-33211.
-
Resolution: Done

The PR is merged to upstream

> Implement table lineage graph
> -
>
> Key: FLINK-33211
> URL: https://issues.apache.org/jira/browse/FLINK-33211
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Implement table lineage graph



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33212) Introduce job status changed listener for lineage

2024-07-02 Thread Zhenqiu Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang closed FLINK-33212.
-
Resolution: Done

The PR is merged to upstream.

> Introduce job status changed listener for lineage
> -
>
> Key: FLINK-33212
> URL: https://issues.apache.org/jira/browse/FLINK-33212
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Affects Versions: 1.20.0
>Reporter: Fang Yong
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Introduce job status changed listener relevant interfaces and its 
> implementation. The job listeners will be registered in runtime and also 
> client side pipeline executors, including localExecutor, embeddedExecutor for 
> application mode, and  abstract session cluster executor.  When job 
> submission is successfully, the job created event will be created with 
> lineage graph info.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33211) Implement table lineage graph

2024-07-02 Thread Zhenqiu Huang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862638#comment-17862638
 ] 

Zhenqiu Huang commented on FLINK-33211:
---

PR is merged to upstream

> Implement table lineage graph
> -
>
> Key: FLINK-33211
> URL: https://issues.apache.org/jira/browse/FLINK-33211
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Fang Yong
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Implement table lineage graph



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35735) Don't list catalogs when closing session

2024-07-02 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-35735.
-
Fix Version/s: 2.0.0
   Resolution: Fixed

> Don't list catalogs when closing session
> 
>
> Key: FLINK-35735
> URL: https://issues.apache.org/jira/browse/FLINK-35735
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35735) Don't list catalogs when closing session

2024-07-02 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862632#comment-17862632
 ] 

Shengkai Fang commented on FLINK-35735:
---

Merged into master: af3e39fa4ce70187166e2f1dfef1ebfeee6d40cb

> Don't list catalogs when closing session
> 
>
> Key: FLINK-35735
> URL: https://issues.apache.org/jira/browse/FLINK-35735
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35735) Don't list catalogs when closing session

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35735:
---
Labels: pull-request-available  (was: )

> Don't list catalogs when closing session
> 
>
> Key: FLINK-35735
> URL: https://issues.apache.org/jira/browse/FLINK-35735
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: Shengkai Fang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35735][sql-gateway] Don't list all catalogs when closing session [flink]

2024-07-02 Thread via GitHub


fsk119 merged PR #25010:
URL: https://github.com/apache/flink/pull/25010


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35318][table-planner] use UTC timezone to handle TIMESTAMP_WITHOUT_TIME_ZONE type in RexNodeToExpressionConverter [flink]

2024-07-02 Thread via GitHub


lshangq commented on PR #24787:
URL: https://github.com/apache/flink/pull/24787#issuecomment-2204906269

   @leonardBang Could you help to take a look at it. Or how do you think it 
should be changed?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33211][table] support flink table lineage [flink]

2024-07-02 Thread via GitHub


FangYongs merged PR #24618:
URL: https://github.com/apache/flink/pull/24618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33211][table] support flink table lineage [flink]

2024-07-02 Thread via GitHub


FangYongs commented on PR #24618:
URL: https://github.com/apache/flink/pull/24618#issuecomment-2204890224

   Thanks @HuangZhenQiu , +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663376970


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * A transform projection processor contains:
+ *
+ * 
+ *   CreateTableEvent: add the user-defined computed columns into Schema.
+ *   SchemaChangeEvent: update the columns of TransformProjection.
+ *   DataChangeEvent: Fill data field to row in PreTransformOperator. 
Process the data column
+ *   and the user-defined expression computed columns.
+ * 
+ */
+public class PostTransformProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(PostTransformProcessor.class);
+private TableInfo tableInfo;
+private TableChangeInfo tableChangeInfo;
+private TransformProjection transformProjection;
+private @Nullable TransformFilter transformFilter;
+private String timezone;
+private Map 
projectionColumnProcessorMap;
+private List cachedProjectionColumns;
+
+public PostTransformProcessor(
+TableInfo tableInfo,
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+@Nullable TransformFilter transformFilter,
+String timezone) {
+this.tableInfo = tableInfo;
+this.tableChangeInfo = tableChangeInfo;
+this.transformProjection = transformProjection;
+this.transformFilter = transformFilter;
+this.timezone = timezone;
+this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
+this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, 
transformProjection);
+}
+
+public boolean hasTableChangeInfo() {
+return this.tableChangeInfo != null;
+}
+
+public boolean hasTableInfo() {
+return this.tableInfo != null;
+}
+
+public static PostTransformProcessor of(
+TableInfo tableInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter,
+String timezone) {
+return new PostTransformProcessor(
+tableInfo, null, transformProjection, transformFilter, 
timezone);
+}
+
+public static PostTransformProcessor of(
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter) {
+return new PostTransformProcessor(
+null, tableChangeInfo, transformProjection, transformFilter, 
null);
+}
+
+public static PostTransformProcessor of(
+TransformProjection transformProjection, TransformFilter 
transformFilter) {
+return new PostTransformProcessor(null, null, transformProjection, 
transformFilter, null);
+}
+
+public Schema processSchemaChangeEvent(Schema schema) {
+List projectionColumns =
+TransformParser.generateProjectionColumns(
+transformProjection.getProjection(), 
schema.getColumns());
+transformProjection.setProjectionColumns(projectionColumns);
+return schema.copy(
+projectionColumns.stream()
+.map(ProjectionColumn::getColumn)
+.collect(Collectors.toList()));

Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-02 Thread via GitHub


lvyanquan commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663360564


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * A transform projection processor contains:
+ *
+ * 
+ *   CreateTableEvent: add the user-defined computed columns into Schema.
+ *   SchemaChangeEvent: update the columns of TransformProjection.
+ *   DataChangeEvent: Fill data field to row in PreTransformOperator. 
Process the data column
+ *   and the user-defined expression computed columns.
+ * 
+ */
+public class PostTransformProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(PostTransformProcessor.class);
+private TableInfo tableInfo;
+private TableChangeInfo tableChangeInfo;
+private TransformProjection transformProjection;
+private @Nullable TransformFilter transformFilter;
+private String timezone;
+private Map 
projectionColumnProcessorMap;
+private List cachedProjectionColumns;
+
+public PostTransformProcessor(
+TableInfo tableInfo,
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+@Nullable TransformFilter transformFilter,
+String timezone) {
+this.tableInfo = tableInfo;
+this.tableChangeInfo = tableChangeInfo;
+this.transformProjection = transformProjection;
+this.transformFilter = transformFilter;
+this.timezone = timezone;
+this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
+this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, 
transformProjection);
+}
+
+public boolean hasTableChangeInfo() {
+return this.tableChangeInfo != null;
+}
+
+public boolean hasTableInfo() {
+return this.tableInfo != null;
+}
+
+public static PostTransformProcessor of(
+TableInfo tableInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter,
+String timezone) {
+return new PostTransformProcessor(
+tableInfo, null, transformProjection, transformFilter, 
timezone);
+}
+
+public static PostTransformProcessor of(
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter) {
+return new PostTransformProcessor(
+null, tableChangeInfo, transformProjection, transformFilter, 
null);
+}
+
+public static PostTransformProcessor of(
+TransformProjection transformProjection, TransformFilter 
transformFilter) {
+return new PostTransformProcessor(null, null, transformProjection, 
transformFilter, null);
+}
+
+public Schema processSchemaChangeEvent(Schema schema) {
+List projectionColumns =
+TransformParser.generateProjectionColumns(
+transformProjection.getProjection(), 
schema.getColumns());
+transformProjection.setProjectionColumns(projectionColumns);
+return schema.copy(
+projectionColumns.stream()
+.map(ProjectionColumn::getColumn)
+.collect(Collectors.toList()));

[jira] [Updated] (FLINK-35736) Add E2e migration scripts for Flink CDC

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35736:
---
Labels: pull-request-available  (was: )

> Add E2e migration scripts for Flink CDC
> ---
>
> Key: FLINK-35736
> URL: https://issues.apache.org/jira/browse/FLINK-35736
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, there's no E2e migration tests in Flink CDC CI, and it's not very 
> convenient for testers to run migration tests to verify RC release before 
> each release.
> Adding a automation migration testing script could help verifying CDC 
> backwards compatibility better.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35736][tests] Add migration test scripts & CI workflows [flink-cdc]

2024-07-02 Thread via GitHub


lvyanquan commented on code in PR #3447:
URL: https://github.com/apache/flink-cdc/pull/3447#discussion_r1663361163


##
tools/mig-test/README.md:
##
@@ -0,0 +1,36 @@
+# Flink CDC Migration Test Utilities
+
+## Pipeline Jobs
+### Preparation
+
+1. Install Ruby (macOS has embedded it by default)
+2. (Optional) Run `gem install terminal-table` for better display
+
+### Compile snapshot CDC versions
+3. Set `CDC_SOURCE_HOME` to the root directory of the Flink CDC git repository
+4. Run `ruby prepare_libs.rb` to download released / compile snapshot CDC 
versions
+
+### Run migration tests
+5. Enter `conf/` and run `docker compose up -d` to start up test containers
+6. Set `FLINK_HOME` to the home directory of Flink
+7. Run `ruby run_migration_test.rb` to start testing

Review Comment:
   Considering that this is mainly provided for developers and there are not 
many steps involved, I can also accept the current mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Pipeline Transform job supports omitting / renaming calculation column [flink-cdc]

2024-07-02 Thread via GitHub


lvyanquan commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1663360564


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformProcessor.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.schema.Column;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.runtime.parser.TransformParser;
+import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The processor of transform projection applies to process a row of filtering 
tables.
+ *
+ * A transform projection processor contains:
+ *
+ * 
+ *   CreateTableEvent: add the user-defined computed columns into Schema.
+ *   SchemaChangeEvent: update the columns of TransformProjection.
+ *   DataChangeEvent: Fill data field to row in PreTransformOperator. 
Process the data column
+ *   and the user-defined expression computed columns.
+ * 
+ */
+public class PostTransformProcessor {
+private static final Logger LOG = 
LoggerFactory.getLogger(PostTransformProcessor.class);
+private TableInfo tableInfo;
+private TableChangeInfo tableChangeInfo;
+private TransformProjection transformProjection;
+private @Nullable TransformFilter transformFilter;
+private String timezone;
+private Map 
projectionColumnProcessorMap;
+private List cachedProjectionColumns;
+
+public PostTransformProcessor(
+TableInfo tableInfo,
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+@Nullable TransformFilter transformFilter,
+String timezone) {
+this.tableInfo = tableInfo;
+this.tableChangeInfo = tableChangeInfo;
+this.transformProjection = transformProjection;
+this.transformFilter = transformFilter;
+this.timezone = timezone;
+this.projectionColumnProcessorMap = new ConcurrentHashMap<>();
+this.cachedProjectionColumns = cacheProjectionColumnMap(tableInfo, 
transformProjection);
+}
+
+public boolean hasTableChangeInfo() {
+return this.tableChangeInfo != null;
+}
+
+public boolean hasTableInfo() {
+return this.tableInfo != null;
+}
+
+public static PostTransformProcessor of(
+TableInfo tableInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter,
+String timezone) {
+return new PostTransformProcessor(
+tableInfo, null, transformProjection, transformFilter, 
timezone);
+}
+
+public static PostTransformProcessor of(
+TableChangeInfo tableChangeInfo,
+TransformProjection transformProjection,
+TransformFilter transformFilter) {
+return new PostTransformProcessor(
+null, tableChangeInfo, transformProjection, transformFilter, 
null);
+}
+
+public static PostTransformProcessor of(
+TransformProjection transformProjection, TransformFilter 
transformFilter) {
+return new PostTransformProcessor(null, null, transformProjection, 
transformFilter, null);
+}
+
+public Schema processSchemaChangeEvent(Schema schema) {
+List projectionColumns =
+TransformParser.generateProjectionColumns(
+transformProjection.getProjection(), 
schema.getColumns());
+transformProjection.setProjectionColumns(projectionColumns);
+return schema.copy(
+projectionColumns.stream()
+.map(ProjectionColumn::getColumn)
+.collect(Collectors.toList()));

Re: [PR] [FLINK-35743][cdc-runtime] Fix the time zone configuration for temporal functions is not effective [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on code in PR #3449:
URL: https://github.com/apache/flink-cdc/pull/3449#discussion_r1663358098


##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java:
##


Review Comment:
   I could still reproduce `testTimestampDiffTransform` failure when testing 
time was set to June 1st 04:00. Seems month and year diff will not be 0 when 8 
hours gap crossed month / year boundaries, where the output would be `-28800, 
-480, -8, 0, [0 or -1], [0 or -1]`.



##
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java:
##
@@ -553,6 +562,26 @@ void testTimestampDiffTransform() throws Exception {
 Assertions.assertThat(
 
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
 .isEqualTo(new StreamRecord<>(insertEventExpect));
+
+DataChangeEvent insertEvent2 =
+DataChangeEvent.insertEvent(
+TIMESTAMPDIFF_TABLEID,
+recordDataGenerator.generate(
+new Object[] {
+new BinaryStringData("2"), null, null, 
null, null, null, null
+}));
+DataChangeEvent insertEventExpect2 =
+DataChangeEvent.insertEvent(
+TIMESTAMPDIFF_TABLEID,
+recordDataGenerator.generate(
+new Object[] {
+new BinaryStringData("2"), -28800, -480, 
-8, 0, 0, 0

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35638) Modify OceanBase Docker container to make the test cases runnable on non-Linux systems

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35638:
---
Labels: pull-request-available  (was: )

> Modify OceanBase Docker container to make the test cases runnable on 
> non-Linux systems
> --
>
> Key: FLINK-35638
> URL: https://issues.apache.org/jira/browse/FLINK-35638
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: He Wang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35638] Refactor OceanBase test cases and remove dependency on host network [flink-cdc]

2024-07-02 Thread via GitHub


whhe commented on PR #3439:
URL: https://github.com/apache/flink-cdc/pull/3439#issuecomment-2204779815

   @GOODBOY008 @ruanhang1993 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35742] Don't create RocksDB Column Family if task cancellation is in progress [flink]

2024-07-02 Thread via GitHub


rkhachatryan commented on PR #25011:
URL: https://github.com/apache/flink/pull/25011#issuecomment-2204412181

   @flinkbot run azure
   please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35742] Don't create RocksDB Column Family if task cancellation is in progress [flink]

2024-07-02 Thread via GitHub


rkhachatryan commented on PR #25011:
URL: https://github.com/apache/flink/pull/25011#issuecomment-2204034788

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662905134


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for DynamoDB streams. */
+public class DynamoDbStreamsProxy implements StreamProxy {
+
+private final DynamoDbStreamsClient dynamoDbStreamsClient;
+private final SdkHttpClient httpClient;
+private final Map shardIdToIteratorStore;
+
+private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsProxy.class);
+
+public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, 
SdkHttpClient httpClient) {
+this.dynamoDbStreamsClient = dynamoDbStreamsClient;
+this.httpClient = httpClient;
+this.shardIdToIteratorStore = new ConcurrentHashMap<>();
+}
+
+@Override
+public List listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+return this.getShardsOfStream(streamArn, lastSeenShardId);
+}
+
+@Override
+public GetRecordsResponse getRecords(String streamArn, String shardId, 
StartingPosition startingPosition) {
+String shardIterator =
+shardIdToIteratorStore.computeIfAbsent(
+shardId, (s) -> getShardIterator(streamArn, s, 
startingPosition));
+
+if (shardIterator == null) {
+return null;
+}

Review Comment:
   today, in DDB streams API, the only indication of shard being ended is the 
next shard iterator being null.
   
   I think the return null will only ever be used to tell that shard end has 
reached. I don't have any preference over returning a repsonse or returning 
null. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662908458


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/FullJitterBackoff.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Random;
+
+/**
+ * Used to calculate full jitter backoff sleep durations.
+ *
+ * @see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/";>
+ * Exponential Backoff and Jitter 
+ */
+@Internal
+public class FullJitterBackoff {
+
+/** Random seed used to calculate backoff jitter for DynamoDb Streams 
operations. */
+private final Random seed = new Random();
+
+/**
+ * Calculates the sleep time for full jitter based on the given parameters.
+ *
+ * @param baseMillis the base backoff time in milliseconds
+ * @param maxMillis the maximum backoff time in milliseconds

Review Comment:
   Yes, we'll be using AWS SDK retry policy. This missed cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662908016


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for DynamoDB streams. */
+public class DynamoDbStreamsProxy implements StreamProxy {
+
+private final DynamoDbStreamsClient dynamoDbStreamsClient;
+private final SdkHttpClient httpClient;
+private final Map shardIdToIteratorStore;
+
+private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsProxy.class);
+
+public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, 
SdkHttpClient httpClient) {
+this.dynamoDbStreamsClient = dynamoDbStreamsClient;
+this.httpClient = httpClient;
+this.shardIdToIteratorStore = new ConcurrentHashMap<>();
+}
+
+@Override
+public List listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+return this.getShardsOfStream(streamArn, lastSeenShardId);
+}
+
+@Override
+public GetRecordsResponse getRecords(String streamArn, String shardId, 
StartingPosition startingPosition) {
+String shardIterator =
+shardIdToIteratorStore.computeIfAbsent(
+shardId, (s) -> getShardIterator(streamArn, s, 
startingPosition));
+
+if (shardIterator == null) {
+return null;
+}
+try {
+GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
+if (getRecordsResponse.nextShardIterator() != null) {
+shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
+}
+return getRecordsResponse;
+} catch (ExpiredIteratorException e) {
+// Eagerly retry getRecords() if the iterator is expired
+shardIterator = getShardIterator(streamArn, shardId, 
startingPosition);
+GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
+if (getRecordsResponse.nextShardIterator() != null) {
+shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
+}
+return getRecordsResponse;
+}
+}
+
+@Override
+public void close() throws IOException {
+dynamoDbStreamsClient.close();
+httpClient.close();
+}
+
+private List getShardsOfStream(
+String streamName, @Nullable String lastSeenShardId) {
+List shardsOfStream = new ArrayList<>();
+
+DescribeStreamResponse describeStreamResponse;
+do {
+describeStreamResponse = this.describeStream(streamName, 
lastSeenShardId);
+L

Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662907223


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for DynamoDB streams. */
+public class DynamoDbStreamsProxy implements StreamProxy {
+
+private final DynamoDbStreamsClient dynamoDbStreamsClient;
+private final SdkHttpClient httpClient;
+private final Map shardIdToIteratorStore;
+
+private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsProxy.class);
+
+public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, 
SdkHttpClient httpClient) {
+this.dynamoDbStreamsClient = dynamoDbStreamsClient;
+this.httpClient = httpClient;
+this.shardIdToIteratorStore = new ConcurrentHashMap<>();
+}
+
+@Override
+public List listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+return this.getShardsOfStream(streamArn, lastSeenShardId);
+}
+
+@Override
+public GetRecordsResponse getRecords(String streamArn, String shardId, 
StartingPosition startingPosition) {
+String shardIterator =
+shardIdToIteratorStore.computeIfAbsent(
+shardId, (s) -> getShardIterator(streamArn, s, 
startingPosition));
+
+if (shardIterator == null) {
+return null;
+}
+try {
+GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
+if (getRecordsResponse.nextShardIterator() != null) {
+shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
+}
+return getRecordsResponse;
+} catch (ExpiredIteratorException e) {
+// Eagerly retry getRecords() if the iterator is expired
+shardIterator = getShardIterator(streamArn, shardId, 
startingPosition);
+GetRecordsResponse getRecordsResponse = getRecords(shardIterator);
+if (getRecordsResponse.nextShardIterator() != null) {
+shardIdToIteratorStore.put(shardId, 
getRecordsResponse.nextShardIterator());
+}
+return getRecordsResponse;
+}
+}
+
+@Override
+public void close() throws IOException {
+dynamoDbStreamsClient.close();
+httpClient.close();
+}
+
+private List getShardsOfStream(
+String streamName, @Nullable String lastSeenShardId) {
+List shardsOfStream = new ArrayList<>();
+
+DescribeStreamResponse describeStreamResponse;
+do {
+describeStreamResponse = this.describeStream(streamName, 
lastSeenShardId);
+L

Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662905134


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for DynamoDB streams. */
+public class DynamoDbStreamsProxy implements StreamProxy {
+
+private final DynamoDbStreamsClient dynamoDbStreamsClient;
+private final SdkHttpClient httpClient;
+private final Map shardIdToIteratorStore;
+
+private static final FullJitterBackoff BACKOFF = new FullJitterBackoff();
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsProxy.class);
+
+public DynamoDbStreamsProxy(DynamoDbStreamsClient dynamoDbStreamsClient, 
SdkHttpClient httpClient) {
+this.dynamoDbStreamsClient = dynamoDbStreamsClient;
+this.httpClient = httpClient;
+this.shardIdToIteratorStore = new ConcurrentHashMap<>();
+}
+
+@Override
+public List listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+return this.getShardsOfStream(streamArn, lastSeenShardId);
+}
+
+@Override
+public GetRecordsResponse getRecords(String streamArn, String shardId, 
StartingPosition startingPosition) {
+String shardIterator =
+shardIdToIteratorStore.computeIfAbsent(
+shardId, (s) -> getShardIterator(streamArn, s, 
startingPosition));
+
+if (shardIterator == null) {
+return null;
+}

Review Comment:
   today, in DDB streams API, the only indication of shard being ended is the 
next shard iterator being null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903372


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException;
+import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+
+/**
+ * This class is used to discover and assign DynamoDb Streams splits to 
subtasks on the Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class DynamoDbStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Configuration sourceConfig;
+private final StreamProxy streamProxy;
+private final DynamoDbStreamsShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment 
= new HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public DynamoDbStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Configuration sourceConfig,
+StreamProxy streamProxy,
+DynamoDbStreamsShardAssigner shardAssigner,
+DynamoDbStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.sourceConfig = sourceConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = state.getUnassignedSplits();
+}
+}
+
+@Override
+public void start() {
+if (lastSeenShardId == null) {
+context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+}
+
+final long shardDiscoveryInterval = 
sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS);
+context.callAsync(
+this::periodicallyDiscoverSplits,
+this::assignSplits,
+shardDiscoveryInterval,
+shardDiscoveryInterval);
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, @Nullable String 
reques

Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903043


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException;
+import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;
+
+/**
+ * This class is used to discover and assign DynamoDb Streams splits to 
subtasks on the Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class DynamoDbStreamsSourceEnumerator
+implements SplitEnumerator {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final String streamArn;
+private final Configuration sourceConfig;
+private final StreamProxy streamProxy;
+private final DynamoDbStreamsShardAssigner shardAssigner;
+private final ShardAssignerContext shardAssignerContext;
+
+private final Map> splitAssignment 
= new HashMap<>();
+private final Set assignedSplitIds = new HashSet<>();
+private final Set unassignedSplits;
+
+private String lastSeenShardId;
+
+public DynamoDbStreamsSourceEnumerator(
+SplitEnumeratorContext context,
+String streamArn,
+Configuration sourceConfig,
+StreamProxy streamProxy,
+DynamoDbStreamsShardAssigner shardAssigner,
+DynamoDbStreamsSourceEnumeratorState state) {
+this.context = context;
+this.streamArn = streamArn;
+this.sourceConfig = sourceConfig;
+this.streamProxy = streamProxy;
+this.shardAssigner = shardAssigner;
+this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+if (state == null) {
+this.lastSeenShardId = null;
+this.unassignedSplits = new HashSet<>();
+} else {
+this.lastSeenShardId = state.getLastSeenShardId();
+this.unassignedSplits = state.getUnassignedSplits();
+}
+}
+
+@Override
+public void start() {
+if (lastSeenShardId == null) {
+context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+}
+
+final long shardDiscoveryInterval = 
sourceConfig.get(SHARD_DISCOVERY_INTERVAL_MILLIS);
+context.callAsync(
+this::periodicallyDiscoverSplits,
+this::assignSplits,
+shardDiscoveryInterval,
+shardDiscoveryInterval);
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, @Nullable String 
reques

[jira] [Commented] (FLINK-35695) Release Testing: Verify FLINK-32315: Support local file upload in K8s mode

2024-07-02 Thread Mate Czagany (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862530#comment-17862530
 ] 

Mate Czagany commented on FLINK-35695:
--

h2. Verification Conclusion

Using the steps described below, I could confirm that the new feature works as 
expected and documented.

> Release Testing: Verify FLINK-32315: Support local file upload in K8s mode
> --
>
> Key: FLINK-35695
> URL: https://issues.apache.org/jira/browse/FLINK-35695
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client / Job Submission
>Reporter: Ferenc Csaky
>Assignee: Mate Czagany
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.20.0
>
> Attachments: image-2024-07-01-14-54-17-770.png, 
> image-2024-07-01-15-04-53-764.png
>
>
> Follow up the test for FLINK-32315.
> In Flink 1.20, we introduced a local file upload possibility for Kubernetes 
> deployments. To verify this feature, you can check the relevant 
> [PR|https://github.com/apache/flink/pull/24303], which includes the docs, and 
> examples for more information.
> To test this feature, it is required to have an available Kubernetes cluster 
> to deploy to, and some DFS where Flink can deploy the local JAR. For a 
> sandbox setup, I recommend to install {{minikube}}. The flink-k8s-operator 
> [quickstart 
> guide|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#prerequisites]
>  explains that pretty well ({{helm}} is not needed here). For the DFS, I have 
> a gist to setup Minio on a K8s pod 
> [here|https://gist.github.com/ferenc-csaky/fd7fee71d89cd389cac2da4a4471ab65].
> The two following main use-case should be handled correctly:
> # Deploy job with a local job JAR, but without further dependencies
> {code:bash}
> $ ./bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=my-first-application-cluster \
> -Dkubernetes.container.image=flink:1.20 \
> -Dkubernetes.artifacts.local-upload-enabled=true \
> -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
> local:///path/to/TopSpeedWindowing.jar
> {code}
> # Deploy job with a local job JAR, and further dependencies (e.g. a UDF 
> included in a separate JAR).
> {code:bash}
> $ ./bin/flink run-application \
> --target kubernetes-application \
> -Dkubernetes.cluster-id=my-first-application-cluster \
> -Dkubernetes.container.image=flink:1.20 \
> -Dkubernetes.artifacts.local-upload-enabled=true \
> -Dkubernetes.artifacts.local-upload-target=s3://my-bucket/ \
> 
> -Duser.artifacts.artifact-list=local:///tmp/my-flink-udf1.jar\;s3://my-bucket/my-flink-udf2.jar
>  \
> local:///tmp/my-flink-job.jar
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34341] Initial implementation of DynamoDBStreams Source. Marke… [flink-connector-aws]

2024-07-02 Thread via GitHub


gguptp commented on code in PR #146:
URL: 
https://github.com/apache/flink-connector-aws/pull/146#discussion_r1662903713


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java:
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.proxy;
+
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
+import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
+import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for DynamoDB streams. */
+public class DynamoDbStreamsProxy implements StreamProxy {

Review Comment:
   Good point. WIll do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-31215) Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-31215:
---
Labels: pull-request-available  (was: )

> Backpropagate processing rate limits from non-scalable bottlenecks to 
> upstream operators
> 
>
> Key: FLINK-31215
> URL: https://issues.apache.org/jira/browse/FLINK-31215
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Artem Plyusnin
>Priority: Major
>  Labels: pull-request-available
>
> The current algorithm scales operators based on input data rates by 
> propagating it forward through the graph.
> However there are cases where a certain operators processing capacity is 
> limited either because it has a set maxParallelism or the users excludes it 
> from scaling (or otherwise the capacity doesnt increase with scaling).
> In these cases it doesn't make sense to scale upstream operators to the 
> target data rate if the job is going to be bottlenecked by a downstream 
> operator. But instead we should backpropagate the limit based on the 
> non-scalable bottleneck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators [flink-kubernetes-operator]

2024-07-02 Thread via GitHub


aplyusnin commented on PR #847:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/847#issuecomment-2203883673

   Hi, @gyfora, could you review the code and run the workflows, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35730) PipelineDefinitionParser add parse string method

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35730:
---
Labels: pull-request-available  (was: )

> PipelineDefinitionParser add parse string method
> 
>
> Key: FLINK-35730
> URL: https://issues.apache.org/jira/browse/FLINK-35730
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: Wenkai Qi
>Priority: Not a Priority
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> /**
> * Parse the specified pipeline definition string, merge global 
> configurations, then generate
> * the \{@link PipelineDef}.
> */
> PipelineDef parse(String text, Configuration globalPipelineConfig) throws 
> Exception;
>  
> Adding this method is beneficial for third-party platforms to use this 
> interface to parse strings as PipelineDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Flink Kubernetes Operator 1.9.0 [flink-web]

2024-07-02 Thread via GitHub


gyfora merged PR #747:
URL: https://github.com/apache/flink-web/pull/747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Support config yaml [flink-kubernetes-operator]

2024-07-02 Thread via GitHub


ctrlaltdilj commented on PR #848:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/848#issuecomment-2203244208

   Thanks @mateczagany for all the info! I have requested a JIRA account, and 
will file a ticket


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35730][cdc-cli] PipelineDefinitionParser add parse string method [flink-cdc]

2024-07-02 Thread via GitHub


aiwenmo commented on PR #3444:
URL: https://github.com/apache/flink-cdc/pull/3444#issuecomment-2203208303

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35742] Don't create RocksDB CF if task cancellation is in progress [flink]

2024-07-02 Thread via GitHub


rkhachatryan opened a new pull request, #25011:
URL: https://github.com/apache/flink/pull/25011

   We observe a lot of TMs stuck for > 30s in 
RocksDBHandle.registerStateColumnFamilyHandleWithImport which boil down to 
native calls to create Column Family. This change registers prevents CF 
creation if task cancellation is in progress.
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]

2024-07-02 Thread via GitHub


hackergin commented on code in PR #24975:
URL: https://github.com/apache/flink/pull/24975#discussion_r1662619373


##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html
+---
+
+
+# Quick Start Guide
+
+This guide will help you quickly understand and get started with materialized 
tables. It includes setting up the environment, creating materialized tables in 
CONTINUOUS mode, and creating materialized tables in FULL mode.
+
+## Environment Setup
+
+### Directory Preparation
+
+**Replace the example paths below with real paths on your machine.**
+
+- Create directories for Catalog Store and Catalog dependencies:
+
+```
+# Directory for File Catalog Store to save catalog information
+mkdir -p /path/to/catalog/store
+
+# Directory for test-filesystem Catalog to save table metadata and table data
+mkdir -p /path/to/catalog/test-filesystem
+
+# Default database for test-filesystem Catalog
+mkdir -p /path/to/catalog/test-filesystem/mydb
+```
+
+- Create directories for Checkpoints and Savepoints to save Checkpoints and 
Savepoints respectively:
+
+```
+mkdir -p /path/to/checkpoint
+
+mkdir -p /path/to/savepoint
+```
+
+### Dependency Preparation
+
+The method here is similar to the steps recorded in [local installation]({{< 
ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like 
operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need 
to have __Java 11__ installed locally. You can check the installed Java version 
with the following command:

Review Comment:
   Here I referenced the descriptions from other chapters. From my 
understanding, it is to inform users that Java 8 is no longer supported, and to 
discourage its use even though it is currently supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35743) The time zone configuration for temporal functions is not effective

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35743:
---
Labels: pull-request-available  (was: )

> The time zone configuration for temporal functions is not effective
> ---
>
> Key: FLINK-35743
> URL: https://issues.apache.org/jira/browse/FLINK-35743
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Wenkai Qi
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.1
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The time zone configuration for temporal functions is not effective, 
> including now() and date_format().



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35742) Don't create RocksDB CF if task cancellation is in progress

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35742:
---
Labels: pull-request-available  (was: )

> Don't create RocksDB CF if task cancellation is in progress
> ---
>
> Key: FLINK-35742
> URL: https://issues.apache.org/jira/browse/FLINK-35742
> Project: Flink
>  Issue Type: Improvement
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]

2024-07-02 Thread via GitHub


hackergin commented on code in PR #24975:
URL: https://github.com/apache/flink/pull/24975#discussion_r166970


##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html
+---
+
+
+# Quick Start Guide
+
+This guide will help you quickly understand and get started with materialized 
tables. It includes setting up the environment, creating materialized tables in 
CONTINUOUS mode, and creating materialized tables in FULL mode.
+
+## Environment Setup
+
+### Directory Preparation
+
+**Replace the example paths below with real paths on your machine.**
+
+- Create directories for Catalog Store and Catalog dependencies:
+
+```
+# Directory for File Catalog Store to save catalog information
+mkdir -p /path/to/catalog/store
+
+# Directory for test-filesystem Catalog to save table metadata and table data
+mkdir -p /path/to/catalog/test-filesystem
+
+# Default database for test-filesystem Catalog
+mkdir -p /path/to/catalog/test-filesystem/mydb
+```
+
+- Create directories for Checkpoints and Savepoints to save Checkpoints and 
Savepoints respectively:
+
+```
+mkdir -p /path/to/checkpoint
+
+mkdir -p /path/to/savepoint
+```
+
+### Dependency Preparation
+
+The method here is similar to the steps recorded in [local installation]({{< 
ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like 
operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need 
to have __Java 11__ installed locally. You can check the installed Java version 
with the following command:

Review Comment:
   Yes, but we need to avoid mentions of Java8 Because of 
https://issues.apache.org/jira/browse/FLINK-25247



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. [flink]

2024-07-02 Thread via GitHub


fapaul commented on code in PR #24998:
URL: https://github.com/apache/flink/pull/24998#discussion_r1661069156


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##
@@ -337,6 +337,20 @@ void testChainNodeSetParallelism() {
 assertThat(vertices.get(0).isParallelismConfigured()).isTrue();
 }
 
+@Test
+void testParallelismConfiguredForSinkV2() {

Review Comment:
   Isn't this test better placed in `SinkTransformationTranslatorITCaseBase`?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java:
##
@@ -357,7 +357,9 @@ private  R adjustTransformations(
 // In this case, the subTransformation does not contain 
any customized
 // parallelism value and will therefore inherit the 
parallelism value
 // from the sinkTransformation.
-
subTransformation.setParallelism(transformation.getParallelism());
+subTransformation.setParallelism(
+transformation.getParallelism(),
+transformation.isParallelismConfigured());

Review Comment:
   I haven't followed the runtime development for a while but is 
`isParallelismConfigured` also applicable for streaming jobs or only batch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35743][cdc-runtime] Fix the time zone configuration for temporal functions is not effective [flink-cdc]

2024-07-02 Thread via GitHub


aiwenmo commented on PR #3449:
URL: https://github.com/apache/flink-cdc/pull/3449#issuecomment-2203201015

   @yuxiqian PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35742] Don't create RocksDB CF if task cancellation is in progress [flink]

2024-07-02 Thread via GitHub


flinkbot commented on PR #25011:
URL: https://github.com/apache/flink/pull/25011#issuecomment-2203191141

   
   ## CI report:
   
   * 30dc775c032759e64215775dba3f7528aa3b5301 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. [flink]

2024-07-02 Thread via GitHub


JunRuiLee commented on PR #24998:
URL: https://github.com/apache/flink/pull/24998#issuecomment-2203161525

   Squashed the commits and rebased onto master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode [flink]

2024-07-02 Thread via GitHub


flinkbot commented on PR #25005:
URL: https://github.com/apache/flink/pull/25005#issuecomment-2200185822

   
   ## CI report:
   
   * 765d5ce23ee57c25cde2ac93c22596eff314e6e7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [BP-1.20][FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode [flink]

2024-07-02 Thread via GitHub


hackergin opened a new pull request, #25005:
URL: https://github.com/apache/flink/pull/25005

   ## What is the purpose of the change
   
   Currently, in continuous mode, the checkpoint interval is set based on 
freshness by default. However, if the user explicitly sets a checkpoint 
interval, we should follow the user's setting.
   
   
   ## Brief change log
   
   * Fix periodic refresh job naming.
   * Do not override the user-defined checkpoint interval in continuous mode
   
   
   ## Verifying this change
   
   * Add test 
`testCreateMaterializedTableInContinuousModeWithCustomCheckpointInterval` in 
MaterializedTableStatementITCase to verify that we don't over write the user 
custom checkpoint interval.
   BP 1.20 for https://github.com/apache/flink/pull/25002
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Release flink-shaded 19.0 [flink-web]

2024-07-02 Thread via GitHub


MartijnVisser commented on code in PR #749:
URL: https://github.com/apache/flink-web/pull/749#discussion_r1661043966


##
docs/data/additional_components.yml:
##
@@ -33,12 +39,6 @@ flink-shaded-17.0:
   source_release_asc_url: 
"https://downloads.apache.org/flink/flink-shaded-17.0/flink-shaded-17.0-src.tgz.asc";
   source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-shaded-17.0/flink-shaded-17.0-src.tgz.sha512";
 
-flink-shaded-16.2:
-  name: "Apache Flink-shaded 16.2 Source Release"
-  source_release_url: 
"https://www.apache.org/dyn/closer.lua/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz";
-  source_release_asc_url: 
"https://downloads.apache.org/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz.asc";
-  source_release_sha512_url: 
"https://downloads.apache.org/flink/flink-shaded-16.2/flink-shaded-16.2-src.tgz.sha512";
-

Review Comment:
   Do we only keep the latest version of shaded, or do we keep all of them? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35731) Sink V2 operator is mistakenly assumed always to be parallelism configured

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35731:
---
Labels: pull-request-available  (was: )

> Sink V2 operator is mistakenly assumed always to be parallelism configured
> --
>
> Key: FLINK-35731
> URL: https://issues.apache.org/jira/browse/FLINK-35731
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the Sink V2 operator is always marked as parallelism configured, 
> which prevents parallelism from being inferred. This can cause confusion for 
> users utilizing the Adaptive Batch scheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]

2024-07-02 Thread via GitHub


XComp commented on code in PR #24911:
URL: https://github.com/apache/flink/pull/24911#discussion_r1647485917


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphHandler.java:
##
@@ -81,23 +82,20 @@ public void reportCheckpointMetrics(
 public void reportInitializationMetrics(
 ExecutionAttemptID executionAttemptId,
 SubTaskInitializationMetrics initializationMetrics) {
-if (executionGraph.getCheckpointStatsTracker() == null) {
+final CheckpointCoordinatorConfiguration checkpointConfig =
+executionGraph.getCheckpointCoordinatorConfiguration();
+if (checkpointConfig == null || 
!checkpointConfig.isCheckpointingEnabled()) {
 // TODO: Consider to support reporting initialization stats 
without checkpointing
 log.debug(
 "Ignoring reportInitializationMetrics if checkpoint 
coordinator is not present");

Review Comment:
   Ok, the work I described in [my comment 
above](https://github.com/apache/flink/pull/24911#discussion_r1647484212) makes 
it necessary to verify the code. So, I might fix that along the way as well. :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-02 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1660998380


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT

Review Comment:
   This should be `4.4`



##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT

Review Comment:
   This should be `4.4-SNAPSHOT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35734) Do not override the user-defined checkpoint interval in continuous mode.

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35734:
---
Labels: pull-request-available  (was: )

> Do not override the user-defined checkpoint interval in continuous mode.
> 
>
> Key: FLINK-35734
> URL: https://issues.apache.org/jira/browse/FLINK-35734
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Gateway
>Reporter: Feng Jin
>Assignee: Feng Jin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {color:#00}Currently, in continuous mode, the checkpoint interval is set 
> based on freshness by default. However, if the user explicitly sets a 
> checkpoint interval, we should follow the user's setting.{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35643][doc] Add materialized table statement doc [flink]

2024-07-02 Thread via GitHub


lsyldliu commented on code in PR #24975:
URL: https://github.com/apache/flink/pull/24975#discussion_r1660970466


##
docs/content/docs/dev/table/materialized-table/syntax.md:
##
@@ -0,0 +1,337 @@
+---
+title: Syntax

Review Comment:
   Statements



##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html
+---
+
+
+# Quick Start Guide
+
+This guide will help you quickly understand and get started with materialized 
tables. It includes setting up the environment, creating materialized tables in 
CONTINUOUS mode, and creating materialized tables in FULL mode.

Review Comment:
   This guide will help you quickly understand and get started with 
materialized tables. It includes setting up the environment and creating, 
altering, and dropping materialized tables in CONTINUOUS and FULL mode.
   



##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html

Review Comment:
   quick-start -> quickstart



##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html
+---
+
+
+# Quick Start Guide
+
+This guide will help you quickly understand and get started with materialized 
tables. It includes setting up the environment, creating materialized tables in 
CONTINUOUS mode, and creating materialized tables in FULL mode.
+
+## Environment Setup
+
+### Directory Preparation
+
+**Replace the example paths below with real paths on your machine.**
+
+- Create directories for Catalog Store and Catalog dependencies:
+
+```
+# Directory for File Catalog Store to save catalog information
+mkdir -p /path/to/catalog/store
+
+# Directory for test-filesystem Catalog to save table metadata and table data
+mkdir -p /path/to/catalog/test-filesystem
+
+# Default database for test-filesystem Catalog
+mkdir -p /path/to/catalog/test-filesystem/mydb
+```
+
+- Create directories for Checkpoints and Savepoints to save Checkpoints and 
Savepoints respectively:
+
+```
+mkdir -p /path/to/checkpoint
+
+mkdir -p /path/to/savepoint
+```
+
+### Dependency Preparation
+
+The method here is similar to the steps recorded in [local installation]({{< 
ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like 
operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need 
to have __Java 11__ installed locally. You can check the installed Java version 
with the following command:
+
+```
+java -version
+```
+
+Next, [download](https://flink.apache.org/downloads/) the latest Flink binary 
package and extract it:
+
+```
+tar -xzf flink-*.tgz
+```
+
+Download the 
[test-filesystem](https://https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-filesystem-test-utils/)
 connector and place it in the lib directory.
+
+```
+cp flink-table-filesystem-test-utils-{VERSION}.jar flink-*/lib/
+```
+
+### Configuration Preparation
+
+Edit the config.yaml file and add the following configurations:
+
+```yaml
+execution:
+  checkpoints:
+dir: file:///path/to/savepoint
+
+# Configure file catalog
+table:
+  catalog-store:
+kind: file
+file:
+  path: /path/to/catalog/store
+
+# Configure embedded scheduler
+workflow-scheduler:
+  type: embedded
+
+# Configure SQL gateway address and port
+sql-gateway:
+  endpoint:
+rest:
+  address: 127.0.0.1
+  port: 8083
+```
+
+### Start Flink Cluster
+
+Run the following script to start the cluster locally:
+
+```
+./bin/start-cluster.sh
+```
+
+### Start SQL Gateway
+
+Run the following script to start the SQL Gateway locally:
+
+```
+./sql-gateway.sh start
+```
+
+### Start SQL Client
+
+Run the following script to start the SQL Client locally:
+
+```
+./sql-client.sh gateway --endpoint http://127.0.0.1:8083

Review Comment:
   ./bin/



##
docs/content/docs/dev/table/materialized-table/quick-start.md:
##
@@ -0,0 +1,333 @@
+---
+title: Quick Start
+weight: 3
+type: docs
+aliases:
+- /dev/table/materialized-table/quick-start.html
+---
+
+
+# Quick Start Guide
+
+This guide will help you quickly understand and get started with materialized 
tables. It includes setting up the environment, creating materialized tables in 
CONTINUOUS mode, and creating materialized tables in FULL mode.
+
+## Environment Setup
+
+### Directory Preparation
+
+**Replace the example paths below with real paths on your machine.**
+
+- Create directories for Catalog Store and Catalog dependencies:
+
+```
+# Directory for File Catalog Store to save catalog information
+mkdir -p /path/to/catalog/store
+
+# Directory for test-filesystem Catalog to save table metadata and table data
+mkdir -p /path/to/catalog/

Re: [PR] [hotfix][parquet] fix dependences and variable name problems in FLINK-35702 [flink]

2024-07-02 Thread via GitHub


flinkbot commented on PR #25004:
URL: https://github.com/apache/flink/pull/25004#issuecomment-2200079714

   
   ## CI report:
   
   * 9fba47c2d98e83aeebc51347847f14ee7f067445 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][parquet] fix dependences and variable name problems in FLINK-35702 [flink]

2024-07-02 Thread via GitHub


Stephen0421 opened a new pull request, #25004:
URL: https://github.com/apache/flink/pull/25004

   
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - fix parquet flink-runtime dependency version
 - optimize put null flags logics
 - relocation exclude three flink-runtime class.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   
   This change is already covered by existing tests, such as 
org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReaderTest
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper:no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35734][table] Do not override the user-defined checkpoint interval in continuous mode. [flink]

2024-07-02 Thread via GitHub


lsyldliu merged PR #25002:
URL: https://github.com/apache/flink/pull/25002


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]

2024-07-02 Thread via GitHub


Stephen0421 commented on PR #24991:
URL: https://github.com/apache/flink/pull/24991#issuecomment-2197896793

   > @Stephen0421, could you, please, look at this issue 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-35698 It seems to be 
connected to your work. There is a problem Parquet reading nested data when 
field type is DECIMAL. I provided a test case there.
   
   Hello @empathy87, I run your test case in my local pr branch and it run 
successfully, I think the pr has solve the problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]

2024-07-02 Thread via GitHub


Stephen0421 commented on code in PR #24991:
URL: https://github.com/apache/flink/pull/24991#discussion_r1659515022


##
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java:
##
@@ -578,4 +569,167 @@ public static WritableColumnVector 
createWritableColumnVector(
 throw new UnsupportedOperationException(fieldType + " is not 
supported now.");
 }
 }
+
+public static List buildFieldsList(
+List childrens, List fieldNames, 
MessageColumnIO columnIO) {
+List list = new ArrayList<>();
+for (int i = 0; i < childrens.size(); i++) {
+list.add(
+constructField(
+childrens.get(i), lookupColumnByName(columnIO, 
fieldNames.get(i;
+}
+return list;
+}
+
+private static ParquetField constructField(RowType.RowField rowField, 
ColumnIO columnIO) {
+boolean required = columnIO.getType().getRepetition() == REQUIRED;
+int repetitionLevel = columnIO.getRepetitionLevel();
+int definitionLevel = columnIO.getDefinitionLevel();
+LogicalType type = rowField.getType();
+String filedName = rowField.getName();
+if (type instanceof RowType) {
+GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+RowType rowType = (RowType) type;
+ImmutableList.Builder fieldsBuilder = 
ImmutableList.builder();
+List fieldNames = rowType.getFieldNames();
+List childrens = rowType.getFields();
+for (int i = 0; i < childrens.size(); i++) {
+fieldsBuilder.add(
+constructField(
+childrens.get(i),
+lookupColumnByName(groupColumnIO, 
fieldNames.get(i;
+}
+
+return new ParquetGroupField(
+type, repetitionLevel, definitionLevel, required, 
fieldsBuilder.build());
+}
+
+if (type instanceof MapType) {
+GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+GroupColumnIO keyValueColumnIO = 
getMapKeyValueColumn(groupColumnIO);
+MapType mapType = (MapType) type;
+ParquetField keyField =
+constructField(
+new RowType.RowField("", mapType.getKeyType()),
+keyValueColumnIO.getChild(0));
+ParquetField valueField =
+constructField(
+new RowType.RowField("", mapType.getValueType()),
+keyValueColumnIO.getChild(1));
+return new ParquetGroupField(
+type,
+repetitionLevel,
+definitionLevel,
+required,
+ImmutableList.of(keyField, valueField));
+}
+
+if (type instanceof MultisetType) {
+GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+GroupColumnIO keyValueColumnIO = 
getMapKeyValueColumn(groupColumnIO);
+MultisetType multisetType = (MultisetType) type;
+ParquetField keyField =
+constructField(
+new RowType.RowField("", 
multisetType.getElementType()),
+keyValueColumnIO.getChild(0));
+ParquetField valueField =
+constructField(
+new RowType.RowField("", new IntType()), 
keyValueColumnIO.getChild(1));
+return new ParquetGroupField(
+type,
+repetitionLevel,
+definitionLevel,
+required,
+ImmutableList.of(keyField, valueField));
+}
+
+if (type instanceof ArrayType) {
+ArrayType arrayType = (ArrayType) type;
+ColumnIO elementTypeColumnIO;
+if (columnIO instanceof GroupColumnIO) {
+GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+if (!StringUtils.isNullOrWhitespaceOnly(filedName)) {
+while (!Objects.equals(groupColumnIO.getName(), 
filedName)) {
+groupColumnIO = (GroupColumnIO) 
groupColumnIO.getChild(0);
+}
+elementTypeColumnIO = groupColumnIO;
+} else {
+if (arrayType.getElementType() instanceof RowType) {
+elementTypeColumnIO = groupColumnIO;
+} else {
+elementTypeColumnIO = groupColumnIO.getChild(0);
+}
+}
+} else if (columnIO instanceof PrimitiveColumnIO) {
+elementTypeColumnIO = columnIO;
+} else {
+throw new RuntimeException(String.format("Unkown ColumnIO, 
%s", columnIO));
+ 

Re: [PR] [FLINK-35357][docs] Add kubernetes.operator.plugins.listeners config [flink-kubernetes-operator]

2024-07-02 Thread via GitHub


1996fanrui commented on code in PR #845:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/845#discussion_r1659491268


##
docs/layouts/shortcodes/generated/dynamic_section.html:
##
@@ -182,5 +182,11 @@
 Map
 Custom HTTP header for HttpArtifactFetcher. The header will be 
applied when getting the session job artifacts. Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.
 
+
+
kubernetes.operator.plugins.listeners..class

Review Comment:
   Actually, these html files are generated instead of changing manually.
   
   As I understand, you need to update the `KubernetesOperatorConfigOptions` 
and run `mvn clean install -DskipTests -Pgenerate-docs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on PR #3430:
URL: https://github.com/apache/flink-cdc/pull/3430#issuecomment-2197864754

   Thanks for @morazow's comments! Addressed most of them, and left several 
requires further discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35357) Add "kubernetes.operator.plugins.listeners" parameter description to the Operator configuration document

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35357:
---
Labels: pull-request-available  (was: )

> Add "kubernetes.operator.plugins.listeners" parameter description to the 
> Operator configuration document
> 
>
> Key: FLINK-35357
> URL: https://issues.apache.org/jira/browse/FLINK-35357
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Yang Zhou
>Assignee: Yang Zhou
>Priority: Minor
>  Labels: pull-request-available
>
> In Flink Operator "Custom Flink Resource Listeners" in practice (doc: 
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/operations/plugins/#custom-flink-resource]
>  -listeners)
> It was found that the "Operator Configuration Reference" document did not 
> explain the "Custom Flink Resource Listeners" configuration parameters.
> So I wanted to come up with adding:
> kubernetes.operator.plugins.listeners..class: 
> 
> , after all it is useful.
> I want to submit a PR to optimize the document.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35654) Add Flink CDC verification guide in docs

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35654:
---
Labels: pull-request-available  (was: )

> Add Flink CDC verification guide in docs
> 
>
> Key: FLINK-35654
> URL: https://issues.apache.org/jira/browse/FLINK-35654
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, ASF voting process requires vast quality verification before 
> releasing any new versions, including:
>  * Tarball checksum verification
>  * Compile from source code
>  * Run pipeline E2e tests
>  * Run migration tests
>  * Check if jar was packaged with correct JDK version
>  * ...
> Adding verification guide in Flink CDC docs should help developers verify 
> future releases more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on code in PR #3430:
URL: https://github.com/apache/flink-cdc/pull/3430#discussion_r1659427878


##
docs/content/docs/developer-guide/contribute-to-flink-cdc.md:
##
@@ -82,3 +83,56 @@ not need a long description.
 3. Are the Documentation Updated?
 
 If the pull request introduces a new feature, the feature should be documented.
+
+Release Verification Guide
+
+We will prepare for new releases of Flink CDC regularly.
+
+According to the Apache Software Foundation releasing SOP,
+we will make a release candidate version before each release,
+and invite community members to test and vote on this pre-release version.
+
+Everyone is welcomed to participate in the version verification work in 
`d...@flink.apache.org` mailing list.
+The verification content may include the following aspects:
+
+1. Verify if source code could be compiled successfully.
+
+Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build 
tool and compiles on the JDK 8 platform.

Review Comment:
   Well it seems a little arbitrary. I could notice that CI uses Maven 3.8.6 
but latest 3.1.1 release was compiled with 3.9.1. Not sure if all Maven 3+ 
works...



##
docs/content/docs/developer-guide/contribute-to-flink-cdc.md:
##
@@ -82,3 +83,56 @@ not need a long description.
 3. Are the Documentation Updated?
 
 If the pull request introduces a new feature, the feature should be documented.
+
+Release Verification Guide
+
+We will prepare for new releases of Flink CDC regularly.
+
+According to the Apache Software Foundation releasing SOP,
+we will make a release candidate version before each release,
+and invite community members to test and vote on this pre-release version.
+
+Everyone is welcomed to participate in the version verification work in 
`d...@flink.apache.org` mailing list.
+The verification content may include the following aspects:
+
+1. Verify if source code could be compiled successfully.
+
+Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build 
tool and compiles on the JDK 8 platform.

Review Comment:
   Well seems Maven version isn't always consistent. I noticed that CI uses 
Maven 3.8.6 but latest 3.1.1 release was compiled with 3.9.1. Not sure if all 
Maven 3+ works.



##
docs/content/docs/developer-guide/contribute-to-flink-cdc.md:
##
@@ -82,3 +83,56 @@ not need a long description.
 3. Are the Documentation Updated?
 
 If the pull request introduces a new feature, the feature should be documented.
+
+Release Verification Guide
+
+We will prepare for new releases of Flink CDC regularly.
+
+According to the Apache Software Foundation releasing SOP,
+we will make a release candidate version before each release,
+and invite community members to test and vote on this pre-release version.
+
+Everyone is welcomed to participate in the version verification work in 
`d...@flink.apache.org` mailing list.
+The verification content may include the following aspects:
+
+1. Verify if source code could be compiled successfully.
+
+Currently, Flink CDC uses [Maven](https://maven.apache.org/) 3 as the build 
tool and compiles on the JDK 8 platform.
+You can download the RC version of the source code package and compile it 
using the `mvn clean package -Dfast` command,
+and check if there's any unexpected errors or warnings.
+
+2. Verify if tarball checksum matches.
+
+To ensure the genuinity and integrity of released binary packages, a SHA512 
hash value of the corresponding file is attached to any released binary tarball 
so that users can verify the integrity.
+You can download the binary tarball of the RC version and calculate its SHA512 
hash value with the following command:
+
+* Linux: `sha512sum flink-cdc-*-bin.tar.gz`
+* macOS: `shasum -a 512 flink-cdc-*-bin.tar.gz`
+* Windows (PowerShell): `Get-FileHash flink-cdc-*-bin.tar.gz -Algorithm SHA512 
| Format-List`
+
+3. Verify that the binary package was compiled with JDK 8.
+
+Unpack the precompiled binary jar package and check if the `Build-Jdk` entry 
in the `META-INF\MANIFEST.MF` file is correct.
+
+4. Run migration tests.
+
+Flink CDC tries to ensure backward compatibility of state, that is, the job 
state (Checkpoint/Savepoint) saved with previous CDC version should be usable 
in the new version.
+You can run CDC migration verification locally with [Flink CDC Migration Test 
Utils](https://github.com/yuxiqian/migration-test) script.
+
+* [Pipeline Job Migration Test 
Guide](https://github.com/yuxiqian/migration-test/blob/main/README.md)
+* [DataStream Job Migration Test 
Guide](https://github.com/yuxiqian/migration-test/blob/main/datastream/README.md)

Review Comment:
   I would prefer putting it in CDC repository along with existing CI workflows 
since it isn't significant enough to get an independent repo. But before all I 
need to clean up scripts before it could be accepted by any upstream 
repositories.



##
docs/content/docs/developer-guide/contribute-to

Re: [PR] [FLINK-35654][docs] Add CDC release verification & Installation guide [flink-cdc]

2024-07-02 Thread via GitHub


yuxiqian commented on code in PR #3430:
URL: https://github.com/apache/flink-cdc/pull/3430#discussion_r1659416389


##
README.md:
##
@@ -27,7 +27,15 @@ full database synchronization, sharding table 
synchronization, schema evolution
 
 ![Flink CDC framework desigin](docs/static/fig/architecture.png)
 
+### Installation
 
+* Flink CDC tar could be downloaded from [Apache Flink 
Website](https://flink.apache.org/downloads/#apache-flink-cdc) or [GitHub 
Release Page](https://github.com/apache/flink-cdc/releases).
+* Pipeline and source connectors could be downloaded from [Maven Central 
Repository](https://mvnrepository.com/artifact/org.apache.flink) or [GitHub 
Release Page](https://github.com/apache/flink-cdc/releases).
+* If you're using Linux or macOS, you may install Flink CDC and connectors 
with Homebrew:

Review Comment:
   Homebrew is also available on Linux (rarely used though). Currently, Flink 
CDC isn't provided in any other package managers. I could remove Linux here if 
it's confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]

2024-07-02 Thread via GitHub


jeyhunkarimov opened a new pull request, #24995:
URL: https://github.com/apache/flink/pull/24995

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]

2024-07-02 Thread via GitHub


flinkbot commented on PR #24995:
URL: https://github.com/apache/flink/pull/24995#issuecomment-2197770924

   
   ## CI report:
   
   * b739e000f1ad7e9ce6f1dcbb3fbd2bfb27016083 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] [FLINK-XXXX] [core] WIP Watermarking [flink]

2024-07-02 Thread via GitHub


jeyhunkarimov closed pull request #24995: [WIP] [FLINK-] [core] WIP 
Watermarking
URL: https://github.com/apache/flink/pull/24995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35702) Support Parquet Nested Read

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35702:
---
Labels: pull-request-available  (was: )

> Support Parquet Nested Read
> ---
>
> Key: FLINK-35702
> URL: https://issues.apache.org/jira/browse/FLINK-35702
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Wenchao Wu
>Assignee: Wenchao Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Now flink parquet doesn’t support read nested columns such as Array, 
> Array,Array in vectorized.  This feature is aimed to solve this 
> problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35553][runtime] Wires up the RescaleManager with the CheckpointLifecycleListener interface [flink]

2024-07-02 Thread via GitHub


XComp commented on PR #24912:
URL: https://github.com/apache/flink/pull/24912#issuecomment-2197023625

   I addressed the last two comments and rebased the branch to most-recent 
version of parent PR #24911 . That way we also have the CI debug commit included


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35702][format][parquet] support parquet read nested. [flink]

2024-07-02 Thread via GitHub


empathy87 commented on PR #24991:
URL: https://github.com/apache/flink/pull/24991#issuecomment-2197002034

   @Stephen0421, could you, please, look at this issue 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-35698
   It seems to be connected to your work. There is a problem Parquet reading 
nested data when field type is DECIMAL. I provided a test case there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35552][runtime] Moves CheckpointStatsTracker out of DefaultExecutionGraphFactory into Scheduler [flink]

2024-07-02 Thread via GitHub


ztison commented on code in PR #24911:
URL: https://github.com/apache/flink/pull/24911#discussion_r1657013866


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java:
##
@@ -214,402 +60,49 @@ void reportRestoredCheckpoint(RestoredCheckpointStats 
restored) {
 restored.getStateSize());
 }
 
-public void reportRestoredCheckpoint(
+void reportRestoredCheckpoint(
 long checkpointID,
 CheckpointProperties properties,
 String externalPath,
-long stateSize) {
-statsReadWriteLock.lock();
-try {
-counts.incrementRestoredCheckpoints();
-checkState(
-jobInitializationMetricsBuilder.isPresent(),
-"JobInitializationMetrics should have been set first, 
before RestoredCheckpointStats");
-jobInitializationMetricsBuilder
-.get()
-.setRestoredCheckpointStats(checkpointID, stateSize, 
properties, externalPath);
-dirty = true;
-} finally {
-statsReadWriteLock.unlock();
-}
-}
+long stateSize);
 
 /**
  * Callback when a checkpoint completes.
  *
  * @param completed The completed checkpoint stats.
  */
-void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
-statsReadWriteLock.lock();
-try {
-latestCompletedCheckpoint = completed;
+void reportCompletedCheckpoint(CompletedCheckpointStats completed);
 
-counts.incrementCompletedCheckpoints();
-history.replacePendingCheckpointById(completed);
+PendingCheckpointStats getPendingCheckpointStats(long checkpointId);
 
-summary.updateSummary(completed);
+void reportIncompleteStats(
+long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics 
metrics);
 
-dirty = true;
-logCheckpointStatistics(completed);
-} finally {
-statsReadWriteLock.unlock();
-}
-}
+void reportInitializationStarted(
+Set toInitialize, long initializationStartTs);
 
-/**
- * Callback when a checkpoint fails.
- *
- * @param failed The failed checkpoint stats.
- */
-void reportFailedCheckpoint(FailedCheckpointStats failed) {
-statsReadWriteLock.lock();
-try {
-counts.incrementFailedCheckpoints();
-history.replacePendingCheckpointById(failed);
-
-dirty = true;
-logCheckpointStatistics(failed);
-} finally {
-statsReadWriteLock.unlock();
-}
-}
-
-private void logCheckpointStatistics(AbstractCheckpointStats 
checkpointStats) {
-try {
-metricGroup.addSpan(
-Span.builder(CheckpointStatsTracker.class, "Checkpoint")
-
.setStartTsMillis(checkpointStats.getTriggerTimestamp())
-
.setEndTsMillis(checkpointStats.getLatestAckTimestamp())
-.setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
-.setAttribute("fullSize", 
checkpointStats.getStateSize())
-.setAttribute("checkpointedSize", 
checkpointStats.getCheckpointedSize())
-.setAttribute("checkpointStatus", 
checkpointStats.getStatus().name()));
-if (LOG.isDebugEnabled()) {
-StringWriter sw = new StringWriter();
-MAPPER.writeValue(
-sw,
-
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true));
-String jsonDump = sw.toString();
-LOG.debug(
-"CheckpointStatistics (for jobID={}, checkpointId={}) 
dump = {} ",
-metricGroup.jobId(),
-checkpointStats.checkpointId,
-jsonDump);
-}
-} catch (Exception ex) {
-LOG.warn("Fail to log CheckpointStatistics", ex);
-}
-}
-
-/**
- * Callback when a checkpoint failure without in progress checkpoint. For 
example, it should be
- * callback when triggering checkpoint failure before creating 
PendingCheckpoint.
- */
-public void reportFailedCheckpointsWithoutInProgress() {
-statsReadWriteLock.lock();
-try {
-counts.incrementFailedCheckpointsWithoutInProgress();
-
-dirty = true;
-} finally {
-statsReadWriteLock.unlock();
-}
-}
-
-public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) 
{
-statsReadWriteLock.lock();
-try {
-AbstractCheckpointStats stats = 
history.getCheckpointById(checkpointId);
-return stats instanceof PendingCheckpointStats ? 
(PendingCheckpointStat

Re: [PR] [FLINK-35551][runtime] Introduces RescaleManager#onTrigger endpoint [flink]

2024-07-02 Thread via GitHub


XComp commented on PR #24910:
URL: https://github.com/apache/flink/pull/24910#issuecomment-2194727428

   [CI 
failure](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60525&view=logs&j=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5&t=54421a62-0c80-5aad-3319-094ff69180bb&l=10037)
 due to [FLINK-30719](https://issues.apache.org/jira/browse/FLINK-30719) 
...again 🙄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35551][runtime] Introduces RescaleManager#onTrigger endpoint [flink]

2024-07-02 Thread via GitHub


XComp commented on PR #24910:
URL: https://github.com/apache/flink/pull/24910#issuecomment-2194727822

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Flink 33386 review splited commits for ci [flink]

2024-07-02 Thread via GitHub


RocMarshal commented on PR #24229:
URL: https://github.com/apache/flink/pull/24229#issuecomment-2194703794

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35344][cdc-base] Move same code from multiple subclasses to JdbcSourceChunkSplitter [flink-cdc]

2024-07-02 Thread via GitHub


loserwang1024 commented on code in PR #3319:
URL: https://github.com/apache/flink-cdc/pull/3319#discussion_r1657064426


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##
@@ -132,16 +172,237 @@ default boolean isEvenlySplitColumn(Column splitColumn) {
  * @param splitColumn dbz split column.
  * @return flink data type
  */
-DataType fromDbzColumn(Column splitColumn);
+protected abstract DataType fromDbzColumn(Column splitColumn);
+
+/** Returns the distribution factor of the given table. */
+protected double calculateDistributionFactor(
+TableId tableId, Object min, Object max, long approximateRowCnt) {
+
+if (!min.getClass().equals(max.getClass())) {
+throw new IllegalStateException(
+String.format(
+"Unsupported operation type, the MIN value type %s 
is different with MAX value type %s.",
+min.getClass().getSimpleName(), 
max.getClass().getSimpleName()));
+}
+if (approximateRowCnt == 0) {
+return Double.MAX_VALUE;
+}
+BigDecimal difference = ObjectUtils.minus(max, min);
+// factor = (max - min + 1) / rowCount
+final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
+double distributionFactor =
+subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, 
ROUND_CEILING).doubleValue();
+LOG.info(
+"The distribution factor of table {} is {} according to the 
min split key {}, max split key {} and approximate row count {}",
+tableId,
+distributionFactor,
+min,
+max,
+approximateRowCnt);
+return distributionFactor;
+}
+
+/**
+ * Get the column which is seen as chunk key.
+ *
+ * @param table table identity.
+ * @param chunkKeyColumn column name which is seen as chunk key, if 
chunkKeyColumn is null, use
+ * primary key instead. @Column the column which is seen as chunk key.
+ */
+protected Column getSplitColumn(Table table, @Nullable String 
chunkKeyColumn) {
+return JdbcChunkUtils.getSplitColumn(table, chunkKeyColumn);
+}
+
+/** ChunkEnd less than or equal to max. */
+protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
+return ObjectUtils.compare(chunkEnd, max) <= 0;
+}
+
+/** ChunkEnd greater than or equal to max. */
+protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
+return ObjectUtils.compare(chunkEnd, max) >= 0;
+}
 
 /**
  * convert dbz column to Flink row type.
  *
  * @param splitColumn split column.
  * @return flink row type.
  */
-default RowType getSplitType(Column splitColumn) {
+private RowType getSplitType(Column splitColumn) {
 return (RowType)
 ROW(FIELD(splitColumn.name(), 
fromDbzColumn(splitColumn))).getLogicalType();
 }
+
+/**
+ * We can use evenly-sized chunks or unevenly-sized chunks when split 
table into chunks, using
+ * evenly-sized chunks which is much efficient, using unevenly-sized 
chunks which will request
+ * many queries and is not efficient.
+ */
+private List splitTableIntoChunks(
+JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
+final String splitColumnName = splitColumn.name();
+final Object[] minMax = JdbcChunkUtils.queryMinMax(jdbc, tableId, 
splitColumnName);
+final Object min = minMax[0];
+final Object max = minMax[1];
+if (min == null || max == null || min.equals(max)) {
+// empty table, or only one row, return full table scan as a chunk
+return Collections.singletonList(ChunkRange.all());
+}
+
+final int chunkSize = sourceConfig.getSplitSize();
+final double distributionFactorUpper = 
sourceConfig.getDistributionFactorUpper();
+final double distributionFactorLower = 
sourceConfig.getDistributionFactorLower();
+
+if (isEvenlySplitColumn(splitColumn)) {
+long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
+double distributionFactor =
+calculateDistributionFactor(tableId, min, max, 
approximateRowCnt);
+
+boolean dataIsEvenlyDistributed =
+doubleCompare(distributionFactor, distributionFactorLower) 
>= 0
+&& doubleCompare(distributionFactor, 
distributionFactorUpper) <= 0;
+
+if (dataIsEvenlyDistributed) {
+// the minimum dynamic chunk size is at least 1
+final int dynamicChunkSize = Math.max((int) 
(distributionFactor * chunkSize), 1);
+return splitEvenlySizedChunks(
+  

Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]

2024-07-02 Thread via GitHub


gliter commented on PR #24986:
URL: https://github.com/apache/flink/pull/24986#issuecomment-2194649007

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35704) ForkJoinPool introduction to NonSplittingRecursiveEnumerator to vastly improve enumeration performance

2024-07-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35704:
---
Labels: pull-request-available  (was: )

> ForkJoinPool introduction to NonSplittingRecursiveEnumerator to vastly 
> improve enumeration performance
> --
>
> Key: FLINK-35704
> URL: https://issues.apache.org/jira/browse/FLINK-35704
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Grzegorz Liter
>Priority: Minor
>  Labels: pull-request-available
> Attachments: ParallelNonSplittingRecursiveEnumerator.java
>
>
> In current implementation of NonSplittingRecursiveEnumerator the files and 
> directories are enumerated in sequence. In case of accessing a remote storage 
> like S3 the vast amount of time is wasted waiting for a response.
> What is worse the enumeration is done by JM it self during which it is 
> unresponsive for RPC calls. When accessing multiple (thousands+) files the 
> wait time can quickly add up and can cause a pekko timeout.
> The performance can be improved by enumerating files in parallel with e.g. 
> ForkJoinPool and parallel streams. I am attaching example implementation that 
> I am happy to contribute to Flink repository.
> In my tests it cuts the time at least 10x



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]

2024-07-02 Thread via GitHub


gliter opened a new pull request, #24986:
URL: https://github.com/apache/flink/pull/24986

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]

2024-07-02 Thread via GitHub


flinkbot commented on PR #24986:
URL: https://github.com/apache/flink/pull/24986#issuecomment-2191696248

   
   ## CI report:
   
   * 5cc83326c8958e4795b9d77d11d0c2793ad7366c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35704] Use ForkJoinPool in NonSplittingRecursiveEnumerator [flink]

2024-07-02 Thread via GitHub


gliter commented on PR #24986:
URL: https://github.com/apache/flink/pull/24986#issuecomment-2191702558

   I would prefer the code be structured a bit differently, mainly by not 
creating  ArrayList just to be passed as parameter and return stream instead in 
`addSplitsForPath` but it have to be like this to preserver definition of this 
method that is kind of public API for extending of 
`NonSplittingRecursiveEnumerator` which few classes do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >