Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamil

Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048

   Thanks @aiwenmo for reviewing, addressed review comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3314:
URL: https://github.com/apache/flink-cdc/pull/3314#issuecomment-2105593186

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian opened a new pull request, #3314:
URL: https://github.com/apache/flink-cdc/pull/3314

   This PR fixes dead links brought by #3310.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]

2024-05-10 Thread via GitHub


yeezychao commented on PR #1907:
URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522

   > > @yuxiqian Turning on upsert mode still fails to filter -u data. I am 
very confused as to why the same PR application failed to test in cdc 3.2 
(flink 1.18) version, but it still works in version 2.2 (flink 1.15). 
Unfortunately, I have not found the reason yet. 
![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZG
 
Vycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs)
   > 
   > @yeezychao Maybe check the output log and confirm if MySQL source actually 
sends any `-U` events to downstream? IIRC Flink will automatically append a 
`ChangelogNormalize` node to backfill missing update before events if source 
doesn't provide it.
   
   You are right!The `ChangelogNormalize` node is indeed added under the Flink 
1.18 version,but the Flink 1.15 version haven't.
   
![image](https://github.com/apache/flink-cdc/assets/18387619/f25a44fe-df3a-43d6-b48e-98fc1be487d8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28867) Parquet reader support nested type in array/map type

2024-05-10 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845539#comment-17845539
 ] 

Xingcan Cui commented on FLINK-28867:
-

Hey [~jark], any plan to improve this in the near future? I feel that this is a 
blocker for Flink OLAP despite the data lake projects having their data 
readers/writers. Sometimes users would like to use Flink to process some raw 
parquet files.

> Parquet reader support nested type in array/map type
> 
>
> Key: FLINK-28867
> URL: https://issues.apache.org/jira/browse/FLINK-28867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.16.0
>Reporter: dalongliu
>Priority: Major
> Attachments: ReadParquetArray1.java, part-00121.parquet
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105586766

   @leonardBang Seems I missed some `{{ref}}` hyperlinks. Will fix it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


leonardBang commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105584829

   @yuxiqian Will appreciate if you can also open PR for release-3.0 and 
release-3.1 branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


leonardBang merged PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


leonardBang commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105580668

   > @leonardBang Sure, reordered sidebar items based on DB-Engine Rankings.
   
   Thanks for the update and basing on DB-Engine Ranking makes sense to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105577742

   @leonardBang Sure, reordered sidebar items based on DB-Engine Rankings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
 return tableInfo;
 }
 
-private void transformSchema(TableId tableId, Schema schema) throws 
Exception {
-for (Tuple4, 
Optional, Boolean>
-transform : transforms) {
-Selectors selectors = transform.f0;
-if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
-TransformProjection transformProjection = transform.f1.get();
+private Schema transformSchema(TableId tableId, Schema schema) throws 
Exception {
+List newSchemas = new ArrayList<>();
+for (PostTransformers transform : transforms) {
+Selectors selectors = transform.getSelectors();
+if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+TransformProjection transformProjection = 
transform.getProjection().get();
+TransformFilter transformFilter = 
transform.getFilter().orElse(null);
 if (transformProjection.isValid()) {
 if 
(!transformProjectionProcessorMap.containsKey(transformProjection)) {
 transformProjectionProcessorMap.put(
 transformProjection,
-
TransformProjectionProcessor.of(transformProjection));
+PostTransformProcessor.of(transformProjection, 
transformFilter));
 }
-TransformProjectionProcessor transformProjectionProcessor =
+PostTransformProcessor postTransformProcessor =
 
transformProjectionProcessorMap.get(transformProjection);
 // update the columns of projection and add the column of 
projection into Schema
-
transformProjectionProcessor.processSchemaChangeEvent(schema);
+
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
 }
 }
 }
+if (newSchemas.isEmpty()) {
+return schema;
+}
+
+Schema firstSchema = newSchemas.get(0);
+newSchemas.stream()
+.skip(1)
+.forEach(
+testSchema -> {
+if (!testSchema.equals(firstSchema)) {
+throw new IllegalArgumentException(
+String.format(
+"Incompatible transform rules 
result found. Inferred schema: %s and %s",
+firstSchema, testSchema));
+}
+});
+return firstSchema;

Review Comment:
   Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597350940


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##
@@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String 
filterExpression) {
 StringBuilder statement = new StringBuilder();
 statement.append("SELECT * FROM ");
 statement.append(DEFAULT_TABLE);
-if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+if (!isNullOrWhitespaceOnly(filterExpression)) {
 statement.append(" WHERE ");
 statement.append(filterExpression);
 }
 return parseSelect(statement.toString());
 }
+
+public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) {
+if (sqlNode instanceof SqlBasicCall) {
+SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;

Review Comment:
   Replace `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##
@@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String 
filterExpression) {
 StringBuilder statement = new StringBuilder();
 statement.append("SELECT * FROM ");
 statement.append(DEFAULT_TABLE);
-if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+if (!isNullOrWhitespaceOnly(filterExpression)) {
 statement.append(" WHERE ");
 statement.append(filterExpression);
 }
 return parseSelect(statement.toString());
 }
+
+public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) {
+if (sqlNode instanceof SqlBasicCall) {
+SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;

Review Comment:
   Check `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #1907:
URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105530310

   > @yuxiqian Turning on upsert mode still fails to filter -u data. I am very 
confused as to why the same PR application failed to test in cdc 3.2 (flink 
1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I 
have not found the reason yet. 
![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZGVy
 
cz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs)
   
   Maybe checking the printed log and confirm if MySQL source actually sends 
any `-U` events to downstream? IIRC Flink will automatically append a 
`ChangelogNormalize` node to backfill missing update before events if source 
doesn't provide it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]

2024-05-10 Thread via GitHub


yeezychao commented on PR #1907:
URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105523327

   @yuxiqian Turning on upsert mode still fails to filter -u data. I am very 
confused as to why the same PR application failed to test in cdc 3.2 (flink 
1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I 
have not found the reason yet.
   
![image](https://github.com/apache/flink-cdc/assets/18387619/32609ad6-9730-4bf5-83c6-91f1343dc617)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-10 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845529#comment-17845529
 ] 

Hang Ruan edited comment on FLINK-35109 at 5/11/24 3:26 AM:


I would like to help it. 


was (Author: ruanhang1993):
I would like to help. Please assign this to me, thanks~

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Fabian Paul
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-10 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845529#comment-17845529
 ] 

Hang Ruan commented on FLINK-35109:
---

I would like to help. Please assign this to me, thanks~

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Fabian Paul
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35112) Membership for Row class does not include field names

2024-05-10 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845526#comment-17845526
 ] 

Dian Fu edited comment on FLINK-35112 at 5/11/24 2:56 AM:
--

Fix in 
- master via b4d71144de8e3772257804b6ed8ad688076430d6

- release-1.19 via d16b20e4fb4fb906927188ca11af599edd0953c1


was (Author: dianfu):
Fix in master via b4d71144de8e3772257804b6ed8ad688076430d6

> Membership for Row class does not include field names
> -
>
> Key: FLINK-35112
> URL: https://issues.apache.org/jira/browse/FLINK-35112
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Assignee: Wouter Zorgdrager
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In the Row class in PyFlink I cannot do a membership check for field names. 
> This minimal example will show the unexpected behavior:
> ```
> from pyflink.common import Row
> row = Row(name="Alice", age=11)
> # Expected to be True, but is False
> print("name" in row)
> person = Row("name", "age")
> # This is True, as expected
> print('name' in person)
> ```
> The related code in the Row class is:
> ```
>     def __contains__(self, item):
>         return item in self._values
> ```
> It should be relatively easy to fix with the following code:
> ```
>     def __contains__(self, item):
>         if hasattr(self, "_fields"):
>             return item in self._fields
>         else:
>             return item in self._values
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35112) Membership for Row class does not include field names

2024-05-10 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-35112:

Fix Version/s: 1.19.1

> Membership for Row class does not include field names
> -
>
> Key: FLINK-35112
> URL: https://issues.apache.org/jira/browse/FLINK-35112
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Assignee: Wouter Zorgdrager
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> In the Row class in PyFlink I cannot do a membership check for field names. 
> This minimal example will show the unexpected behavior:
> ```
> from pyflink.common import Row
> row = Row(name="Alice", age=11)
> # Expected to be True, but is False
> print("name" in row)
> person = Row("name", "age")
> # This is True, as expected
> print('name' in person)
> ```
> The related code in the Row class is:
> ```
>     def __contains__(self, item):
>         return item in self._values
> ```
> It should be relatively easy to fix with the following code:
> ```
>     def __contains__(self, item):
>         if hasattr(self, "_fields"):
>             return item in self._fields
>         else:
>             return item in self._values
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35112) Membership for Row class does not include field names

2024-05-10 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-35112.
---
Fix Version/s: 1.20.0
   Resolution: Fixed

Fix in master via b4d71144de8e3772257804b6ed8ad688076430d6

> Membership for Row class does not include field names
> -
>
> Key: FLINK-35112
> URL: https://issues.apache.org/jira/browse/FLINK-35112
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Assignee: Wouter Zorgdrager
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In the Row class in PyFlink I cannot do a membership check for field names. 
> This minimal example will show the unexpected behavior:
> ```
> from pyflink.common import Row
> row = Row(name="Alice", age=11)
> # Expected to be True, but is False
> print("name" in row)
> person = Row("name", "age")
> # This is True, as expected
> print('name' in person)
> ```
> The related code in the Row class is:
> ```
>     def __contains__(self, item):
>         return item in self._values
> ```
> It should be relatively easy to fix with the following code:
> ```
>     def __contains__(self, item):
>         if hasattr(self, "_fields"):
>             return item in self._fields
>         else:
>             return item in self._values
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35112) Membership for Row class does not include field names

2024-05-10 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-35112:
---

Assignee: Wouter Zorgdrager

> Membership for Row class does not include field names
> -
>
> Key: FLINK-35112
> URL: https://issues.apache.org/jira/browse/FLINK-35112
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Assignee: Wouter Zorgdrager
>Priority: Minor
>  Labels: pull-request-available
>
> In the Row class in PyFlink I cannot do a membership check for field names. 
> This minimal example will show the unexpected behavior:
> ```
> from pyflink.common import Row
> row = Row(name="Alice", age=11)
> # Expected to be True, but is False
> print("name" in row)
> person = Row("name", "age")
> # This is True, as expected
> print('name' in person)
> ```
> The related code in the Row class is:
> ```
>     def __contains__(self, item):
>         return item in self._values
> ```
> It should be relatively easy to fix with the following code:
> ```
>     def __contains__(self, item):
>         if hasattr(self, "_fields"):
>             return item in self._fields
>         else:
>             return item in self._values
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]

2024-05-10 Thread via GitHub


dianfu merged PR #24756:
URL: https://github.com/apache/flink/pull/24756


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]

2024-05-10 Thread via GitHub


aiwenmo commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597329925


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -362,25 +376,51 @@ private Optional processFilter(
 }
 
 private Optional processProjection(
-TransformProjectionProcessor transformProjectionProcessor,
+PostTransformProcessor postTransformProcessor,
 DataChangeEvent dataChangeEvent,
-long epochTime)
-throws Exception {
+long epochTime) {
 BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
 BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
 if (before != null) {
 BinaryRecordData projectedBefore =
-transformProjectionProcessor.processData(before, 
epochTime);
+postTransformProcessor.processData(before, epochTime);
+dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
+}
+if (after != null) {
+BinaryRecordData projectedAfter = 
postTransformProcessor.processData(after, epochTime);
+dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
+}
+return Optional.of(dataChangeEvent);
+}
+
+private Optional processPostProjection(
+TableInfo tableInfo, DataChangeEvent dataChangeEvent) throws 
Exception {
+BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
+BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
+if (before != null) {
+BinaryRecordData projectedBefore = projectRecord(tableInfo, 
before);
 dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
 }
 if (after != null) {
-BinaryRecordData projectedAfter =
-transformProjectionProcessor.processData(after, epochTime);
+BinaryRecordData projectedAfter = projectRecord(tableInfo, after);
 dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
 }
 return Optional.of(dataChangeEvent);
 }
 
+private BinaryRecordData projectRecord(TableInfo tableInfo, 
BinaryRecordData recordData) {
+List valueList = new ArrayList<>();
+RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
+
+for (RecordData.FieldGetter fieldGetter : fieldGetters) {
+valueList.add(fieldGetter.getFieldOrNull(recordData));
+}
+
+return tableInfo
+.getRecordDataGenerator()
+.generate(valueList.toArray(new Object[valueList.size()]));
+}
+
 private boolean containFilteredComputedColumn(String projection, String 
filter) {
 boolean contain = false;

Review Comment:
   Due to the filter always executing before the projection, this method is no 
longer useful.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** Transformation rules used by {@link PostTransformOperator}. */
+public class PostTransformers {
+private final Selectors selectors;
+
+private final Optional projection;
+private final Optional filter;
+
+private final boolean containFilteredComputedColumn;
+

Review Comment:
   Due to the filter always executing before the projection, this private 
variable is no longer useful.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
 return tableInfo;
 }
 
-private void transformSchema(TableId tableId, Schema schema) throws 
Exception

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597329094


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");
+insertStatement.append(
+partitionSpec.entrySet().stream()
+.map(
+entry ->
+String.format(
+"%s = '%s'", 
entry.getKey(), entry.getValue()))
+.reduce((s1, s2) -> s1 + " AND " + s2)
+.get());
+}
+
+try {
+LOG.debug(
+"Begin to manually refreshing the materialization table 
{}, statement: {}",
+materializedTableIdentifier,
+insertStatement);
+return operationExecutor.executeStatement(
+handle, customConfig, insertStatement.toString());
+} catch (Exception e) {
+// log and throw exception
+LOG.error(
+"Manually refreshing the materialization table {} occur 
exception.",
+materializedTableIdentifier,
+e);
+throw new SqlExecutionException(
+String.format(
+"Manually refreshing the materialization table %s 
occur exception.",
+materializedTableIdentifier),
+e);
+}
+}
+
+private static void validatePartitionSpec(
+Map partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+ResolvedSchema schema = table.getResolvedSchema();
+Set allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+Set unknownPartitionKeys = new HashSet<>();
+Set nonStringPartitionKeys = new HashSet<>();
+
+for (String partitionKey : partitionSpec.keySet()) {
+if (!schema.getColumn(partitionKey).isPresent()) {
+unknownPartitionKeys.add(partitionKey);
+continue;
+}
+
+if (!schema.getColumn(partitionKey)
+.get()
+.getDataType()
+.getLogicalType()
+.getTypeRoot()
+.getFamilies()
+.contains(LogicalTypeFamily.C

Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT OVERWRITE %s SELECT * FROM (%s)",
+materializedTableIdentifier,
+materializedTable.getDefinitionQuery()));
+
+if (!partitionSpec.isEmpty()) {
+insertStatement.append(" WHERE ");

Review Comment:
   I think the WHERE clause could be better formatted on a new line



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
 "Submit continuous refresh job for materialized table {} 
occur exception.",
 materializedTableIdentifier,
 e);
-throw new TableException(
+throw new SqlExecutionException(
 String.format(
 "Submit continuous refresh job for materialized 
table %s occur exception.",
 materializedTableIdentifier),
 e);
 }
 }
 
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table = 
operationExecutor.getTable(materializedTableIdentifier);
+if (MATERIALIZED_TABLE != table.getTableKind()) {
+throw new ValidationException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+// Set job name, runtime mode
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableSt

Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


leonardBang commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105456881

   Thanks @yuxiqian for the contribution, could we put the pipeline-connectors 
to the first place? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]

2024-05-10 Thread via GitHub


fredia commented on code in PR #24768:
URL: https://github.com/apache/flink/pull/24768#discussion_r1597330403


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java:
##
@@ -147,15 +160,30 @@ public  S createState(@Nonnull 
StateDescriptor stateDes
 @Override
 @Nonnull
 public StateExecutor createStateExecutor() {
-// TODO: Make io parallelism configurable
-return new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+synchronized (lock) {
+if (disposed) {
+throw new FlinkRuntimeException(
+"Attempt to create StateExecutor after 
ForStKeyedStateBackend is disposed.");
+}
+// TODO: Make io parallelism configurable
+StateExecutor stateExecutor =
+new ForStStateExecutor(4, db, 
optionsContainer.getWriteOptions());
+managedStateExecutors.add(stateExecutor);
+return stateExecutor;
+}
 }
 
 /** Should only be called by one thread, and only after all accesses to 
the DB happened. */
 @Override
 public void dispose() {
-if (this.disposed) {
-return;
+synchronized (lock) {
+if (disposed) {
+return;
+}
+disposed = true;
+for (StateExecutor executor : managedStateExecutors) {

Review Comment:
   Can this `for-loop` be placed outside the lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-10 Thread via GitHub


1996fanrui commented on code in PR #24770:
URL: https://github.com/apache/flink/pull/24770#discussion_r1597329545


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java:
##
@@ -343,7 +343,11 @@ private static StreamStateHandle 
createDummySegmentFileStateHandle(Random rnd) {
 private static StreamStateHandle createDummySegmentFileStateHandle(
 Random rnd, boolean isEmpty) {
 return isEmpty
-? TestingSegmentFileStateHandle.EMPTY_INSTANCE
+? new TestingSegmentFileStateHandle(
+new Path(UUID.randomUUID().toString()),

Review Comment:
   2 tests[1] failed, I changed the path from `empty` to random string to fix 
them.
   
   ## Reason:
   
   SharedStateRegistryImpl#registerReference uses `registrationKey` to check 
whether StreamStateHandle is same. The `registrationKey` is generated from path 
name.
   
   So all `empty` string generates the same `registrationKey`, but 
StreamStateHandle are not same.
   
   ## My question:
   
   Is there strong reason to use `empty` as the path name? If no, current 
change is fine.
   
   [1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59448&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9594



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35323][runtime] Fix transform failure when one rule matches multiple tables with incompatible schema [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian opened a new pull request, #3313:
URL: https://github.com/apache/flink-cdc/pull/3313

   This closes [FLINK-35323](https://issues.apache.org/jira/browse/FLINK-35323).
   
   Currently, Transform module doesn’t support handling multiple tables with 
different schema within one projection / filtering rule. A minimum reproducible 
example is:
   
   ```yaml
   transform:
 - source-table: DB.TABLE\.*
   projection: \*
   ```
   
   If there are `DB.TABLE1` and `DB.TABLE2` in upstream source but with 
different schema, transform operator will panic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]

2024-05-10 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597276173


##
flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules:
##
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:19:26 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ 
ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f

Review Comment:
   I am not sure if these auto-generated files should be added to PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35305) FLIP-438: Amazon SQS Sink Connector

2024-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35305:
---
Labels: pull-request-available  (was: )

> FLIP-438: Amazon SQS Sink Connector
> ---
>
> Key: FLINK-35305
> URL: https://issues.apache.org/jira/browse/FLINK-35305
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / AWS
>Reporter: Priya Dhingra
>Priority: Major
>  Labels: pull-request-available
>
> This is an umbrella task for FLIP-438. FLIP-438: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]

2024-05-10 Thread via GitHub


boring-cyborg[bot] commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2105339886

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]

2024-05-10 Thread via GitHub


19priyadhingra opened a new pull request, #141:
URL: https://github.com/apache/flink-connector-aws/pull/141

   ## Purpose of the change
   
   Implements the Amazon SQS Sink Connector
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   Added unit tests
   Added integration tests
   Manually verified by running the SQS connector on a local Flink cluster.
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [x] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-10 Thread via GitHub


snuyanzin commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2105169563

   May be it is an unpopular opinion
   however I tend to think that `INTERSECT` vs `INTERSECT ALL` and the same for 
others for set and bag semantics is defined for rows and hardly could be 
applied for collections. I failed to find standard approach for collections 
like arrays (I mean in SQL Stadard). 
   Probably that is one of the reasons we could see a number of vendors are 
handling this differently. From one side we could say that we should follow the 
same approach as for rows. The problem I see here is that by default we will 
remove duplicates however what should we do if we want to keep them? There is 
no well known vendor providing both `array_intersect` and `array_intersect_all` 
or keep/remove duplicates  as a parameter. 
   At the same side if we keep duplicates then we will still be able to cover 
both cases:
   we can do for case with duplicates 
   ```
   array_intersect(array1, array2)
   ```
   and for case without
   ```
   array_distinct(array_intersect(array1, array2))
   ```
   
   Yes, `array_union` looks like an exception here, however if we compare 
against vendors then it is a global exception just because there is a nice 
synonym which is used for another function `array_concat`. So if we want to 
concat arrays without duplicates we use `array_union` and then `array_concat`. 
The problem is that not every function has such a workaround .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]

2024-05-10 Thread via GitHub


snuyanzin commented on PR #26:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104973419

   @vahmed-hamdy thanks for the contribution 
   could you please schedule a weekly job on your fork to be sure it passes and 
put a link here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Flink 33461 [flink-connector-jdbc]

2024-05-10 Thread via GitHub


RocMarshal closed pull request #119: Flink 33461
URL: https://github.com/apache/flink-connector-jdbc/pull/119


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown

2024-05-10 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-35318:
-

Assignee: linshangquan  (was: Jane Chan)

> incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during 
> predicate pushdown
> -
>
> Key: FLINK-35318
> URL: https://issues.apache.org/jira/browse/FLINK-35318
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1
> Environment: flink version 1.18.1
> iceberg version 1.15.1
>Reporter: linshangquan
>Assignee: linshangquan
>Priority: Major
> Attachments: image-2024-05-09-14-06-58-007.png, 
> image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, 
> image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, 
> image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png
>
>
> In our scenario, we have an Iceberg table that contains a column named 'time' 
> of the {{timestamptz}} data type. This column has 10 rows of data where the 
> 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" 
> timezone.
> !image-2024-05-09-14-06-58-007.png!
>  
> We encountered a strange phenomenon when accessing the table using 
> Iceberg-flink.
> When the {{WHERE}} clause includes the {{time}} column, the results are 
> incorrect.
> ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" 
> !image-2024-05-09-18-52-03-741.png!
> When there is no {{WHERE}} clause, the results are correct.
> !image-2024-05-09-18-52-28-584.png!
> During debugging, we found that when a {{WHERE}} clause is present, a 
> {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes 
> {{RexNodeToExpressionConverter}} for translation.
> !image-2024-05-09-14-11-38-476.png!
> !image-2024-05-09-14-22-59-370.png!
> When {{RexNodeToExpressionConverter#visitLiteral}} encounters a 
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone 
> "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} 
> type. However, the upstream {{TimestampString}} data has already been 
> processed in UTC timezone. By applying the local timezone processing here, an 
> error occurs due to the mismatch in timezones.
> Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in 
> {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should 
> process the data in UTC timezone.
>  
> Please help confirm if this is the issue, and if so, we can submit a patch to 
> fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown

2024-05-10 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-35318:
-

Assignee: Jane Chan

> incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during 
> predicate pushdown
> -
>
> Key: FLINK-35318
> URL: https://issues.apache.org/jira/browse/FLINK-35318
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1
> Environment: flink version 1.18.1
> iceberg version 1.15.1
>Reporter: linshangquan
>Assignee: Jane Chan
>Priority: Major
> Attachments: image-2024-05-09-14-06-58-007.png, 
> image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, 
> image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, 
> image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png
>
>
> In our scenario, we have an Iceberg table that contains a column named 'time' 
> of the {{timestamptz}} data type. This column has 10 rows of data where the 
> 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" 
> timezone.
> !image-2024-05-09-14-06-58-007.png!
>  
> We encountered a strange phenomenon when accessing the table using 
> Iceberg-flink.
> When the {{WHERE}} clause includes the {{time}} column, the results are 
> incorrect.
> ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" 
> !image-2024-05-09-18-52-03-741.png!
> When there is no {{WHERE}} clause, the results are correct.
> !image-2024-05-09-18-52-28-584.png!
> During debugging, we found that when a {{WHERE}} clause is present, a 
> {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes 
> {{RexNodeToExpressionConverter}} for translation.
> !image-2024-05-09-14-11-38-476.png!
> !image-2024-05-09-14-22-59-370.png!
> When {{RexNodeToExpressionConverter#visitLiteral}} encounters a 
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone 
> "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} 
> type. However, the upstream {{TimestampString}} data has already been 
> processed in UTC timezone. By applying the local timezone processing here, an 
> error occurs due to the mismatch in timezones.
> Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in 
> {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should 
> process the data in UTC timezone.
>  
> Please help confirm if this is the issue, and if so, we can submit a patch to 
> fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown

2024-05-10 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845408#comment-17845408
 ] 

Jane Chan commented on FLINK-35318:
---

Hi [~linshangquan], thanks for reporting this issue. Your understanding is 
correct. 
RexNodeToExpressionConverter#visitLiteral should not convert the literal to UTC 
again since this has been done before at the SQL to Rel phase.

> incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during 
> predicate pushdown
> -
>
> Key: FLINK-35318
> URL: https://issues.apache.org/jira/browse/FLINK-35318
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.18.1
> Environment: flink version 1.18.1
> iceberg version 1.15.1
>Reporter: linshangquan
>Priority: Major
> Attachments: image-2024-05-09-14-06-58-007.png, 
> image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, 
> image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, 
> image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png
>
>
> In our scenario, we have an Iceberg table that contains a column named 'time' 
> of the {{timestamptz}} data type. This column has 10 rows of data where the 
> 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" 
> timezone.
> !image-2024-05-09-14-06-58-007.png!
>  
> We encountered a strange phenomenon when accessing the table using 
> Iceberg-flink.
> When the {{WHERE}} clause includes the {{time}} column, the results are 
> incorrect.
> ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" 
> !image-2024-05-09-18-52-03-741.png!
> When there is no {{WHERE}} clause, the results are correct.
> !image-2024-05-09-18-52-28-584.png!
> During debugging, we found that when a {{WHERE}} clause is present, a 
> {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes 
> {{RexNodeToExpressionConverter}} for translation.
> !image-2024-05-09-14-11-38-476.png!
> !image-2024-05-09-14-22-59-370.png!
> When {{RexNodeToExpressionConverter#visitLiteral}} encounters a 
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone 
> "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} 
> type. However, the upstream {{TimestampString}} data has already been 
> processed in UTC timezone. By applying the local timezone processing here, an 
> error occurs due to the mismatch in timezones.
> Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in 
> {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should 
> process the data in UTC timezone.
>  
> Please help confirm if this is the issue, and if so, we can submit a patch to 
> fix it.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35327) SQL Explain show push down condition

2024-05-10 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845395#comment-17845395
 ] 

lincoln lee commented on FLINK-35327:
-

Hi [~loserwang1024], thanks for reporting this! Assigned to you [~xuyangzhong].

> SQL Explain show push down condition 
> -
>
> Key: FLINK-35327
> URL: https://issues.apache.org/jira/browse/FLINK-35327
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Hongshun Wang
>Assignee: xuyang
>Priority: Minor
> Fix For: 1.20.0
>
>
> Current, we can not determine whether filter/limit/partition condition is 
> pushed down to source. For example, we can only know filter condition is 
> pushed down if it is not included in Filter any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35327) SQL Explain show push down condition

2024-05-10 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35327:
---

Assignee: xuyang

> SQL Explain show push down condition 
> -
>
> Key: FLINK-35327
> URL: https://issues.apache.org/jira/browse/FLINK-35327
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Hongshun Wang
>Assignee: xuyang
>Priority: Minor
> Fix For: 1.20.0
>
>
> Current, we can not determine whether filter/limit/partition condition is 
> pushed down to source. For example, we can only know filter condition is 
> pushed down if it is not included in Filter any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33759] flink parquet writer support write nested array or map type [flink]

2024-05-10 Thread via GitHub


xccui commented on PR #23881:
URL: https://github.com/apache/flink/pull/23881#issuecomment-2104688811

   Hi @LoveHeat, can you cherry-pick the change in 
https://github.com/apache/flink/pull/24029 from @ukby1234 and merge the two PRs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on PR #24760:
URL: https://github.com/apache/flink/pull/24760#issuecomment-2104649932

   Hi, @lsyldliu all comments have been addressed. Can you take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]

2024-05-10 Thread via GitHub


xuyangzhong commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1596779798


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
 "Only support create materialized table in continuous 
refresh mode currently.");
 }
 
+@Test
+void testAlterMaterializedTableRefresh() throws Exception {
+// initialize session handle, create test-filesystem catalog and 
register it to catalog
+// store
+SessionHandle sessionHandle = initializeSession();
+
+String materializedTableDDL =
+"CREATE MATERIALIZED TABLE users_shops"
++ " PARTITIONED BY (ds)\n"
++ " WITH(\n"
++ "   'format' = 'debezium-json'\n"
++ " )\n"
++ " FRESHNESS = INTERVAL '30' SECOND\n"
++ " AS SELECT \n"
++ "  user_id,\n"
++ "  shop_id,\n"
++ "  ds,\n"

Review Comment:
   use values source to check data overwrite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-10 Thread via GitHub


empathy87 commented on PR #24659:
URL: https://github.com/apache/flink/pull/24659#issuecomment-2104557693

   > Thanks, let's wait for ci
   > 
   > btw, @empathy87 could you please create backport PRs with this fix for 
1.19 and 1.18?
   
   Yeh, sure, I will do backports!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35323) Only the schema of the first hit table is recorded when the source-table of the transformer hits multiple tables

2024-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35323:
---
Labels: pull-request-available  (was: )

> Only the schema of the first hit table is recorded when the source-table of 
> the transformer hits multiple tables
> 
>
> Key: FLINK-35323
> URL: https://issues.apache.org/jira/browse/FLINK-35323
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: 3.1.0
>Reporter: Wenkai Qi
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> {code:java}
> transform:
>   - source-table: mydb.web_\.*
> projection: \*, localtimestamp as new_timestamp
>    description: project fields from source table {code}
> Table mydb.web_order: col1, col2, col3
> Table mydb.web_info: col1, col4
> If transform data operator processes `mydb.web_info` first and then 
> `mydb.web_order`, its schema will always be `col1, col4`.
> Cause by:  TransformDataOperator.java
> {code:java}
> private transient Map
> transformProjectionProcessorMap;
> private transient Map 
> transformFilterProcessorMap; {code}
> The relationship of `TableId` is missing here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35323][runtime] Fix transform failure when one rule matches multiple schemas [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3312:
URL: https://github.com/apache/flink-cdc/pull/3312#issuecomment-2104532312

   cc @aiwenmo


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35302][rest] Ignore unknown fields in REST request deserialization [flink]

2024-05-10 Thread via GitHub


gaborgsomogyi commented on PR #24759:
URL: https://github.com/apache/flink/pull/24759#issuecomment-2104485093

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]

2024-05-10 Thread via GitHub


liuyongvs commented on PR #24526:
URL: https://github.com/apache/flink/pull/24526#issuecomment-2104468195

   hi @dawidwys @snuyanzin WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]

2024-05-10 Thread via GitHub


lvyanquan commented on PR #3309:
URL: https://github.com/apache/flink-cdc/pull/3309#issuecomment-2104448042

   LGTM, waiting for CI pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-10 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-35041:
---

Assignee: Rui Fan  (was: Feifan Wang)

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]

2024-05-10 Thread via GitHub


fredia commented on code in PR #24748:
URL: https://github.com/apache/flink/pull/24748#discussion_r1596619045


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java:
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedList;
+
+/**
+ * Epoch manager segments inputs into distinct epochs, marked by the arrival 
of non-records(e.g.
+ * watermark, record attributes). Records are assigned to a unique epoch based 
on their arrival,
+ * records within an epoch are allowed to be parallelized, while the 
non-record of an epoch can only
+ * be executed when all records in this epoch have finished.
+ *
+ * For more details please refer to FLIP-425.
+ */
+public class EpochManager {
+
+/**
+ * This enum defines whether parallel execution between epochs is allowed. 
We should keep this
+ * internal and away from API module for now, until we could see the 
concrete need for {@link
+ * #PARALLEL_BETWEEN_EPOCH} from average users.
+ */
+public enum ParallelMode {
+/**
+ * Subsequent epochs must wait until the previous epoch is completed 
before they can start.
+ */
+SERIAL_BETWEEN_EPOCH,
+/**
+ * Subsequent epochs can begin execution even if the previous epoch 
has not yet completed.
+ * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
+ */
+PARALLEL_BETWEEN_EPOCH
+}
+
+/**
+ * The reference to the {@link AsyncExecutionController}, used for {@link
+ * ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
+ */
+final AsyncExecutionController asyncExecutionController;
+
+/** The number of epochs that have arrived. */
+long epochNum;
+
+/** The output queue to hold ongoing epochs. */
+LinkedList outputQueue;
+
+/** Current active epoch, only one active epoch at the same time. */
+Epoch activeEpoch;
+
+public EpochManager(AsyncExecutionController aec) {
+this.epochNum = 0;
+this.outputQueue = new LinkedList<>();
+// preset an empty epoch, the epoch action will be updated when 
non-record is received.
+this.activeEpoch = new Epoch(epochNum++);
+this.outputQueue.add(activeEpoch);
+this.asyncExecutionController = aec;
+}
+
+/**
+ * Add a record to the current epoch and return the current open epoch, 
the epoch will be
+ * associated with the {@link RecordContext} of this record. Must be 
invoked within task thread.
+ *
+ * @return the current open epoch.
+ */
+public Epoch onRecord() {
+activeEpoch.ongoingRecordCount++;
+return activeEpoch;
+}
+
+/**
+ * Add a non-record to the current epoch, close current epoch and open a 
new epoch. Must be
+ * invoked within task thread.
+ *
+ * @param action the action associated with this non-record.
+ * @param parallelMode the parallel mode for this epoch.
+ */
+public void onNonRecord(Runnable action, ParallelMode parallelMode) {

Review Comment:
   Because there is no exception in the signature of `disposeContext()` , if we 
change Runnable to ThrowingRunnable here, we need to catch the exception in 
`aec.disposeContext()` or  `epochManager.completeOneRecord()`.
   
   I think that catching exceptions in `StreamOperator` is more convenient



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -311,6 +318,10 @@ public void drainInflightRecords(int targetNum) {
 }
 }
 
+public EpochManager getEpochManager() {
+return epochManager;

Review Comment:
   👍Thanks for the suggestion, I added `processNonRecord()` for aec, 
`EpochManager` is not visible for operators now. 



##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work 

Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian commented on PR #3310:
URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2104414800

   @leonardBang @GOODBOY008 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-10 Thread via GitHub


flinkbot commented on PR #24770:
URL: https://github.com/apache/flink/pull/24770#issuecomment-2104413730

   
   ## CI report:
   
   * 82bb2cc9da16f2a3b7c1293c78412d503ca3ea80 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-10 Thread via GitHub


1996fanrui commented on PR #24770:
URL: https://github.com/apache/flink/pull/24770#issuecomment-2104409349

   Hi @masteryhx  @Zakelly , would you mind helping review this PR in your free 
time?
   
   It's a blocker of flink 1.20, and IIUC it's caused by 
https://github.com/apache/flink/pull/24480.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35041:
---
Labels: pull-request-available  (was: )

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>  Labels: pull-request-available
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]

2024-05-10 Thread via GitHub


1996fanrui opened a new pull request, #24770:
URL: https://github.com/apache/flink/pull/24770

   
   ## What is the purpose of the change
   
   TestingSegmentFileStateHandle.EMPTY_INSTANCE is static, it means other test 
can update disposed.
   
   TestingSegmentFileStateHandle object shouldn't be shared in global scope.
   
   
   
   
   ## Brief change log
   
   
   [FLINK-35041][test] Fix the 
IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]

2024-05-10 Thread via GitHub


vahmed-hamdy commented on PR #26:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104372982

   @snuyanzin Could you please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]

2024-05-10 Thread via GitHub


boring-cyborg[bot] commented on PR #26:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104371048

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35322) PubSub Connector Weekly build fails

2024-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35322:
---
Labels: pull-request-available test-stability  (was: test-stability)

> PubSub Connector Weekly build fails 
> 
>
> Key: FLINK-35322
> URL: https://issues.apache.org/jira/browse/FLINK-35322
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Google Cloud PubSub
>Affects Versions: 3.1.0
>Reporter: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available, test-stability
>
> Weekly builds for GCP pubSub connector is failing for 1.19 due to compilation 
> error in tests.
> https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8768752932/job/24063472769
> https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8863605354
> https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8954270618



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]

2024-05-10 Thread via GitHub


SinBex commented on code in PR #24764:
URL: https://github.com/apache/flink/pull/24764#discussion_r1596561578


##
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * The factory class for {@link HiveParallelismInference} to support Hive 
source dynamic parallelism
+ * inference.
+ */
+class HiveDynamicParallelismInferenceFactory implements 
HiveParallelismInference.Provider {
+
+private final ObjectPath tablePath;
+private final JobConf jobConf;
+private final int globalMaxParallelism;
+
+HiveDynamicParallelismInferenceFactory(
+ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) {
+this.tablePath = tablePath;
+this.jobConf = jobConf;
+this.globalMaxParallelism = globalMaxParallelism;
+}
+
+@Override
+public HiveParallelismInference create() {
+boolean inferEnabled =
+jobConf.getBoolean(
+
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(),
+
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue());
+HiveOptions.InferMode inferMode =
+jobConf.getEnum(
+
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(),
+
HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue());

Review Comment:
   I overlooked this issue during testing. Thank you very much for pointing it 
out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35306) Flink cannot compile with jdk17

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845275#comment-17845275
 ] 

Ryan Skraba commented on FLINK-35306:
-

Thanks for the quick fix!  Just to document this, we saw the compilation fail 
on GitHub Actions too:
* 1.20 Java 17 / Compile 
https://github.com/apache/flink/commit/29736b8c01924b7da03d4bcbfd9c812a8e5a08b4/checks/24709533133/logs

> Flink cannot compile with jdk17
> ---
>
> Key: FLINK-35306
> URL: https://issues.apache.org/jira/browse/FLINK-35306
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Tests
>Affects Versions: 1.20.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-05-08-11-48-04-161.png
>
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59380&view=results]
>  fails and benchmark with 17 fails as well
>  
> Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the 
> schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but 
> some subclasses didn't change it, such as: 
> PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier.
>  
> It belongs to flink-tests-java17 module, and it doesn't compile by default.
>  
> it's caused by
>  * https://issues.apache.org/jira/browse/FLINK-25537
>  * [https://github.com/apache/flink/pull/24603]
>  
> !image-2024-05-08-11-48-04-161.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845274#comment-17845274
 ] 

Ryan Skraba commented on FLINK-35041:
-

* 1.20 Hadoop 3.1.3 / Test (module: core) 
https://github.com/apache/flink/actions/runs/9026237714/job/24803537384#step:10:8419
* 1.20 Java 21 / Test (module: core) 
https://github.com/apache/flink/actions/runs/9011311875/job/24758973855#step:10:8334
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8999811164/job/24723153060#step:10:8487
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8997755665/job/24716975457#step:10:9046
* 1.20 Java 11 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8995101420/job/24709819637#step:10:8738
* 1.20 Java 21 / Test (module: core) 
https://github.com/apache/flink/actions/runs/8995101420/job/24709801069#step:10:8940
* 1.20 Default (Java 8) / Test (module: core) 
https://github.com/apache/flink/actions/runs/8985327313/job/24679590248#step:10:8686


> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845273#comment-17845273
 ] 

Ryan Skraba commented on FLINK-35002:
-

* 1.19 Java 11 / Compile 
https://github.com/apache/flink/commit/fa426f104baa1343a07695dcf4c4984814f0fde4/checks/24803211419/logs
* 1.18 Java 11 / Test (module: connect) 
https://github.com/apache/flink/commit/9d0858ee745bc835efa78a34d849d5f3ecb89f6d/checks/24709868165/logs


> GitHub action request timeout  to ArtifactService
> -
>
> Key: FLINK-35002
> URL: https://issues.apache.org/jira/browse/FLINK-35002
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ryan Skraba
>Priority: Major
>  Labels: github-actions, test-stability
>
> A timeout can occur when uploading a successfully built artifact:
>  * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650]
> {code:java}
> 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file 
> uploaded
> 2024-04-02T02:20:15.6360133Z Artifact name is valid!
> 2024-04-02T02:20:15.6362872Z Root directory input is valid!
> 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 3000 ms...
> 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 4785 ms...
> 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 7375 ms...
> 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request 
> timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. 
> Retrying request in 14988 ms...
> 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to 
> make request after 5 attempts: Request timeout: 
> /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact
> 2024-04-02T02:22:59.9893296Z Post job cleanup.
> 2024-04-02T02:22:59.9958844Z Post job cleanup. {code}
> (This is unlikely to be something we can fix, but we can track it.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34404) GroupWindowAggregateProcTimeRestoreTest#testRestore times out

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845271#comment-17845271
 ] 

Ryan Skraba commented on FLINK-34404:
-

(Using the same command line as above)
* 1.20 Default (Java 8) / Test (module: table) 
https://github.com/apache/flink/actions/runs/8999811164/job/24723153970#step:10:12716


> GroupWindowAggregateProcTimeRestoreTest#testRestore times out
> -
>
> Key: FLINK-34404
> URL: https://issues.apache.org/jira/browse/FLINK-34404
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Assignee: Alan Sheinberg
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-34404.failure.log, FLINK-34404.success.log
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57357&view=logs&j=32715a4c-21b8-59a3-4171-744e5ab107eb&t=ff64056b-5320-5afe-c22c-6fa339e59586&l=11603
> {code}
> Feb 07 02:17:40 "ForkJoinPool-74-worker-1" #382 daemon prio=5 os_prio=0 
> cpu=282.22ms elapsed=961.78s tid=0x7f880a485c00 nid=0x6745 waiting on 
> condition  [0x7f878a6f9000]
> Feb 07 02:17:40java.lang.Thread.State: WAITING (parking)
> Feb 07 02:17:40   at 
> jdk.internal.misc.Unsafe.park(java.base@17.0.7/Native Method)
> Feb 07 02:17:40   - parking to wait for  <0xff73d060> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> Feb 07 02:17:40   at 
> java.util.concurrent.locks.LockSupport.park(java.base@17.0.7/LockSupport.java:211)
> Feb 07 02:17:40   at 
> java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.7/CompletableFuture.java:1864)
> Feb 07 02:17:40   at 
> java.util.concurrent.ForkJoinPool.compensatedBlock(java.base@17.0.7/ForkJoinPool.java:3449)
> Feb 07 02:17:40   at 
> java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.7/ForkJoinPool.java:3432)
> Feb 07 02:17:40   at 
> java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.7/CompletableFuture.java:1898)
> Feb 07 02:17:40   at 
> java.util.concurrent.CompletableFuture.get(java.base@17.0.7/CompletableFuture.java:2072)
> Feb 07 02:17:40   at 
> org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase.testRestore(RestoreTestBase.java:292)
> Feb 07 02:17:40   at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.7/Native 
> Method)
> Feb 07 02:17:40   at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.7/NativeMethodAccessorImpl.java:77)
> Feb 07 02:17:40   at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.7/DelegatingMethodAccessorImpl.java:43)
> Feb 07 02:17:40   at 
> java.lang.reflect.Method.invoke(java.base@17.0.7/Method.java:568)
> Feb 07 02:17:40   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34273) git fetch fails

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845270#comment-17845270
 ] 

Ryan Skraba commented on FLINK-34273:
-

* 1.20 test_cron_hadoop313 connect 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59424&view=logs&j=b6f8a893-8f59-51d5-fe28-fb56a8b0932c&t=a2aa31b1-3076-5dd3-ea01-4a81e1467181&l=384
* 1.20 test_ci misc 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59405&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=10163a1a-ea71-5414-a832-7701bff37ba3&l=380


> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34224) ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest timed out

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845269#comment-17845269
 ] 

Ryan Skraba commented on FLINK-34224:
-

* 1.18 Hadoop 3.1.3 / Test (module: core) 
https://github.com/apache/flink/actions/runs/9011311755/job/24759083100#step:10:10641

> ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest 
> timed out
> ---
>
> Key: FLINK-34224
> URL: https://issues.apache.org/jira/browse/FLINK-34224
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: github-actions, test-stability
>
> The timeout appeared in the GitHub Actions workflow (currently in test phase; 
> [FLIP-396|https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure]):
> https://github.com/XComp/flink/actions/runs/7632434859/job/20793613726#step:10:11040
> {code}
> Jan 24 01:38:36 "ForkJoinPool-1-worker-1" #16 daemon prio=5 os_prio=0 
> tid=0x7f3b200ae800 nid=0x406e3 waiting on condition [0x7f3b1ba0e000]
> Jan 24 01:38:36java.lang.Thread.State: WAITING (parking)
> Jan 24 01:38:36   at sun.misc.Unsafe.park(Native Method)
> Jan 24 01:38:36   - parking to wait for  <0xdfbbb358> (a 
> java.util.concurrent.CompletableFuture$Signaller)
> Jan 24 01:38:36   at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> Jan 24 01:38:36   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> Jan 24 01:38:36   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Jan 24 01:38:36   at 
> org.apache.flink.changelog.fs.ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest.java:251)
> Jan 24 01:38:36   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure

2024-05-10 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845268#comment-17845268
 ] 

Ryan Skraba commented on FLINK-26644:
-

* 1.20 Java 11 / Test (module: python) 
https://github.com/apache/flink/actions/runs/9011311875/job/24758993707#step:10:25072

> python 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies 
> failed on azure
> ---
>
> Key: FLINK-26644
> URL: https://issues.apache.org/jira/browse/FLINK-26644
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0
>Reporter: Yun Gao
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> {code:java}
> 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 
> === FAILURES 
> ===
> 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ 
> StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _
> 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = 
>   testMethod=test_generate_stream_graph_with_dependencies>
> 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def 
> test_generate_stream_graph_with_dependencies(self):
> 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = 
> os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir)
> 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = 
> os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
> 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with 
> open(python_file_path, 'w') as f:
> 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def 
> add_two(a):\nreturn a + 2")
> 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env
> 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 
> env.add_python_file(python_file_path)
> 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value):
> 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from 
> test_stream_dependency_manage_lib import add_two
> 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], 
> add_two(value[1])
> 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i):
> 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with 
> open("data/data.txt", 'r') as f:
> 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], 
> i[1] + int(f.read())
> 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = 
> env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1),
> 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24  
>  ('e', 2)],
> 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24  
> type_info=Types.ROW([Types.STRING(),
> 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24  
>  Types.INT()]))
> 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 
> from_collection_source.name("From Collection")
> 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = 
> from_collection_source.key_by(lambda x: x[1], key_type=Types.INT())
> 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = 
> keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3)
> 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = 
> plus_two_map_stream.map(add_from_file).name("Add From File Map")
> 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = 
> add_from_file_map.add_sink(self.test_sink).name("Test Sink")
> 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 
> test_stream_sink.set_parallelism(4)
> 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 
> 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = 
> os.path.join(self.tempdir, "archive_" + str(uuid.uuid4()))
> 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 
> os.mkdir(archive_dir_path)
> 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with 
> open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
> 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3")
> 2022-03-14T18:50:24.68

[jira] [Commented] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator

2024-05-10 Thread zhangdingxin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845267#comment-17845267
 ] 

zhangdingxin commented on FLINK-35237:
--

If the maintainers agree that this improvement is worthwhile, I would be happy 
to take it on. Please feel free to assign the issue to me.

> Allow Sink to Choose HashFunction in PrePartitionOperator
> -
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-10 Thread via GitHub


snuyanzin commented on PR #24659:
URL: https://github.com/apache/flink/pull/24659#issuecomment-2104255794

   Thanks, let's wait for ci
   
   btw, @empathy87 could you please create backport PRs with this fix for 1.19 
and 1.18?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish [flink]

2024-05-10 Thread via GitHub


1996fanrui commented on code in PR #24757:
URL: https://github.com/apache/flink/pull/24757#discussion_r1596434203


##
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##
@@ -201,8 +201,11 @@ void announceCombinedWatermark() {
 // to ready task to avoid period task fail (Java-ThreadPoolExecutor 
will not schedule
 // the period task if it throws an exception).
 for (Integer subtaskId : subTaskIds) {
-context.sendEventToSourceOperatorIfTaskReady(
-subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
+// when subtask have been finished, do not send event.
+if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   Does `NoMoreSplits` mean `subtask has been finished`? 
   
   I'm afraid they are not same. For example, some subtask doesn't have split 
when the parallelism of kafka source is greater than kafka partition.
   
   I'm not sure do we need this change. IIUC, inside of 
`context.sendEventToSourceOperatorIfTaskReady` has a check : do send event when 
`gateway` is not null. 
   
   It means coordinator doesn't send event to subtask when subtask is finished, 
right?
   
   Please correct me if anything is wrong, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35309) Enable Notice file ci check and fix Notice

2024-05-10 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845257#comment-17845257
 ] 

Qingsheng Ren commented on FLINK-35309:
---

CDC release-3.1: 
e452d66b3bfabbab9875d8d5be6f6262749ff30f..62cc62ef722fbe47277704e45166aefaee8d1ac4

> Enable Notice file ci check and fix Notice 
> ---
>
> Key: FLINK-35309
> URL: https://issues.apache.org/jira/browse/FLINK-35309
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
>
> Changes:
> * Add ci to check Notice file 
> * Fix Notice file issue



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-10 Thread via GitHub


empathy87 commented on code in PR #24659:
URL: https://github.com/apache/flink/pull/24659#discussion_r1596500029


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java:
##
@@ -86,4 +83,49 @@ public void testApplyPredicate() {
 OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, 
predicate6);
 assertThat(predicate7).hasToString(predicate8.toString());
 }
+
+@Test
+@SuppressWarnings("unchecked")
+public void testApplyPredicateReverse() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35309][cdc] Fix license and NOTICE files and add CI check in Flink CDC [flink-cdc]

2024-05-10 Thread via GitHub


PatrickRen merged PR #3307:
URL: https://github.com/apache/flink-cdc/pull/3307


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-10 Thread Ufuk Celebi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi reassigned FLINK-35109:
---

Assignee: Fabian Paul  (was: Ufuk Celebi)

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Fabian Paul
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperato

2024-05-10 Thread zhangdingxin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35237:
-
Summary: Allow Sink to Choose HashFunction in PrePartitionOperato  (was: 
Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink 
Customization)

> Allow Sink to Choose HashFunction in PrePartitionOperato
> 
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator

2024-05-10 Thread zhangdingxin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35237:
-
Summary: Allow Sink to Choose HashFunction in PrePartitionOperator  (was: 
Allow Sink to Choose HashFunction in PrePartitionOperato)

> Allow Sink to Choose HashFunction in PrePartitionOperator
> -
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement their PartitionOperator, which undermines the utility of 
> {{{}PrePartitionOperator{}}}.
> To address this limitation, it would be highly desirable to enable the 
> {{PrePartitionOperator}} to support user-specified custom 
> {{{}HashFunction{}}}s (Function). A possible 
> solution could involve a mechanism analogous to the {{DataSink}} interface, 
> allowing the specification of a {{HashFunctionProvider}} class path in the 
> configuration file. This enhancement would greatly facilitate users in 
> tailoring partition strategies to meet their specific application needs.
> In this case, I want to create new class {{HashFunctionProvider}} and 
> {{{}HashFunction{}}}:
> {code:java}
> public interface HashFunctionProvider {
> HashFunction getHashFunction(Schema schema);
> }
> public interface HashFunction extends Function {
> Integer apply(DataChangeEvent event);
> } {code}
> add {{getHashFunctionProvider}} method to {{DataSink}}
>  
> {code:java}
> public interface DataSink {
> /** Get the {@link EventSinkProvider} for writing changed data to 
> external systems. */
> EventSinkProvider getEventSinkProvider();
> /** Get the {@link MetadataApplier} for applying metadata changes to 
> external systems. */
> MetadataApplier getMetadataApplier();
> default HashFunctionProvider getHashFunctionProvider() {
> return new DefaultHashFunctionProvider();
> }
> } {code}
> and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
> {code:java}
> private HashFunction recreateHashFunction(TableId tableId) {
> return 
> hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
> } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink Customization

2024-05-10 Thread zhangdingxin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangdingxin updated FLINK-35237:
-
Description: 
The {{PrePartitionOperator}} in its current implementation only supports a 
fixed {{HashFunction}} 
({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
 This limits the ability of Sink implementations to customize the partitioning 
logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned 
tables, it would be advantageous to allow hashing based on partition keys, 
hashing according to table names, or using the database engine's internal 
primary key hash functions (such as with MaxCompute DataSink).

When users require such custom partitioning logic, they are compelled to 
implement their PartitionOperator, which undermines the utility of 
{{{}PrePartitionOperator{}}}.

To address this limitation, it would be highly desirable to enable the 
{{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s 
(Function). A possible solution could involve a 
mechanism analogous to the {{DataSink}} interface, allowing the specification 
of a {{HashFunctionProvider}} class path in the configuration file. This 
enhancement would greatly facilitate users in tailoring partition strategies to 
meet their specific application needs.

In this case, I want to create new class {{HashFunctionProvider}} and 
{{{}HashFunction{}}}:
{code:java}
public interface HashFunctionProvider {
HashFunction getHashFunction(Schema schema);
}

public interface HashFunction extends Function {
Integer apply(DataChangeEvent event);
} {code}
add {{getHashFunctionProvider}} method to {{DataSink}}

 
{code:java}
public interface DataSink {

/** Get the {@link EventSinkProvider} for writing changed data to external 
systems. */
EventSinkProvider getEventSinkProvider();

/** Get the {@link MetadataApplier} for applying metadata changes to 
external systems. */
MetadataApplier getMetadataApplier();

default HashFunctionProvider getHashFunctionProvider() {
return new DefaultHashFunctionProvider();
}
} {code}
and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method.
{code:java}
private HashFunction recreateHashFunction(TableId tableId) {
return 
hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId));
} {code}
 

  was:
The {{PrePartitionOperator}} in its current implementation only supports a 
fixed {{HashFunction}} 
({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
 This limits the ability of Sink implementations to customize the partitioning 
logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned 
tables, it would be advantageous to allow hashing based on partition keys, 
hashing according to table names, or using the database engine's internal 
primary key hash functions (such as with MaxCompute DataSink).

When users require such custom partitioning logic, they are compelled to 
implement their PartitionOperator, which undermines the utility of 
{{{}PrePartitionOperator{}}}.

To address this limitation, it would be highly desirable to enable the 
{{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s 
(Function). A possible solution could involve a 
mechanism analogous to the {{DataSink}} interface, allowing the specification 
of a {{HashFunctionFactory}} class path in the configuration file. This 
enhancement would greatly facilitate users in tailoring partition strategies to 
meet their specific application needs.

Summary: Allow Sink to Choose HashFunction in PrePartitionOperator for 
Flink Sink Customization  (was: Allow Custom HashFunction in 
PrePartitionOperator for Flink Sink Customization)

> Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink 
> Customization
> --
>
> Key: FLINK-35237
> URL: https://issues.apache.org/jira/browse/FLINK-35237
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: zhangdingxin
>Priority: Major
>
> The {{PrePartitionOperator}} in its current implementation only supports a 
> fixed {{HashFunction}} 
> ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}).
>  This limits the ability of Sink implementations to customize the 
> partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of 
> partitioned tables, it would be advantageous to allow hashing based on 
> partition keys, hashing according to table names, or using the database 
> engine's internal primary key hash functions (such as with MaxCompute 
> DataSink).
> When users require such custom partitioning logic, they are compelled to 
> implement

[PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]

2024-05-10 Thread via GitHub


yuxiqian opened a new pull request, #3310:
URL: https://github.com/apache/flink-cdc/pull/3310

   Since Flink CDC 3.0 release, pipeline connectors are now first-class 
connectors. At the same time, all previous connectors designed for Flink are 
now called "legacy" connectors, which is misleading since they're still under 
active development and improvement, and there's no plan to drop support for 
them.
   
   This PR rectifies the term from "Legacy CDC Sources" to "CDC Sources for 
Flink" in CDC documentations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35089][runtime] Initialize lastRecordAttributes in AbstractStreamOperator during setup [flink]

2024-05-10 Thread via GitHub


elvirasru commented on PR #24655:
URL: https://github.com/apache/flink/pull/24655#issuecomment-2104180848

   Hello, 
   
   Could you tell me when the new version with this fix will be released?
   
   Thank you in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35197][table] Support the execution of supsend&resume materialized table in continuous refresh mode [flink]

2024-05-10 Thread via GitHub


lsyldliu commented on code in PR #24765:
URL: https://github.com/apache/flink/pull/24765#discussion_r1596318366


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Map;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableResumeOperation extends 
AlterMaterializedTableOperation {
+
+private final Map options;

Review Comment:
   It would be better to rename it to `dynamicOptions`.



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Map;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableResumeOperation extends 
AlterMaterializedTableOperation {
+
+private final Map options;
+
+public AlterMaterializedTableResumeOperation(
+ObjectIdentifier tableIdentifier, Map options) {
+super(tableIdentifier);
+this.options = options;
+}
+
+public Map getOptions() {
+return options;
+}
+
+@Override
+public TableResultInternal execute(Context ctx) {
+throw new TableException("This method shouldn't be called.");

Review Comment:
   We should throw the following exception:
   ```
   throw new UnsupportedOperationException(
   "AlterMaterializedTableResumeOperation doesn't support 
ExecutableOperation yet.")
   ```



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
 }
 }
 
+private static String stopJobWithSavepoint(
+OperationExecutor executor,
+OperationHandle handle,
+ContinuousRefreshHandler refreshHandler) {
+ResultFetcher resultFetcher =
+executor.callStopJobOperation(
+executor.getTableEnvironment(),
+handle,
+new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+List results = fetchAllResults(resultFetcher);
+return results.get(0).getString(0).toString();
+}
+
+private static ContinuousRefreshHandler deserializeContinuousHandler(
+byte[] serializedRefreshHandler, ClassLoader classLoader) {
+try {
+return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+serializedRefreshHandler, class

Re: [PR] [FLINK-34993] parser changes to support model CRUD sql [flink]

2024-05-10 Thread via GitHub


flinkbot commented on PR #24769:
URL: https://github.com/apache/flink/pull/24769#issuecomment-2104149510

   
   ## CI report:
   
   * add5f94934ccdbc9d2ad7e8624b00d27b61fb6a4 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34993] parser changes to support model CRUD sql [flink]

2024-05-10 Thread via GitHub


lihaosky commented on PR #24769:
URL: https://github.com/apache/flink/pull/24769#issuecomment-2104148834

   cc @twalthr , @wuchong


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35331) Download links for binary releases are displayed as source releases on website

2024-05-10 Thread Xintong Song (Jira)
Xintong Song created FLINK-35331:


 Summary: Download links for binary releases are displayed as 
source releases on website
 Key: FLINK-35331
 URL: https://issues.apache.org/jira/browse/FLINK-35331
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Xintong Song


Take Pre-bundled Hadoop as examples. The content for downloading are binary 
releases, while the link is displayed as "Pre-bundled Hadoop 2.x.y Source 
Release (asc, sha512)". The problem is caused by misusing 
`source_release_[url|asc_url|sha512_url]` for binary contents in the 
corresponding [yaml 
file.|https://github.com/apache/flink-web/blob/asf-site/docs/data/additional_components.yml]

There are many similar cases in the webpage.

And a relevant issues is that, some source releases are displayed as "XXX 
Source Release Source Release", due to including "Source Release" in the `name` 
field of the corresponding yaml file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34993) Support Model CRUD in parser

2024-05-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34993:
---
Labels: pull-request-available  (was: )

> Support Model CRUD in parser
> 
>
> Key: FLINK-34993
> URL: https://issues.apache.org/jira/browse/FLINK-34993
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Hao Li
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34993] parser changes to support model CRUD sql [flink]

2024-05-10 Thread via GitHub


lihaosky opened a new pull request, #24769:
URL: https://github.com/apache/flink/pull/24769

   ## What is the purpose of the change
   
   Support model CRUD sql syntax
   
   
   ## Brief change log
   
   Add following syntax for model
   * `create model`
   * `drop model`
   * `show model`
   * `show create model`
   * `alter model`
   
   
   ## Verifying this change
   
   Unit test for parser
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (n)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34324) s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced

2024-05-10 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845238#comment-17845238
 ] 

Matthias Pohl edited comment on FLINK-34324 at 5/10/24 8:07 AM:


* master: 
[93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43]
* 1.19: 
[8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914]
* 1.18: 
[7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18]


was (Author: mapohl):
* master
** 
[93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43]
* 1.19
** 
[8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914]
* 1.18
** 
[7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18]

> s3_setup is called in test_file_sink.sh even if the common_s3.sh is not 
> sourced
> ---
>
> Key: FLINK-34324
> URL: https://issues.apache.org/jira/browse/FLINK-34324
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Tests
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> See example CI run from the FLINK-34150 PR:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56570&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3191
> {code}
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_file_sink.sh: 
> line 38: s3_setup: command not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34324) s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced

2024-05-10 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl resolved FLINK-34324.
---
Fix Version/s: 1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

* master
** 
[93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43]
* 1.19
** 
[8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914]
* 1.18
** 
[7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18]

> s3_setup is called in test_file_sink.sh even if the common_s3.sh is not 
> sourced
> ---
>
> Key: FLINK-34324
> URL: https://issues.apache.org/jira/browse/FLINK-34324
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility, Tests
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Major
>  Labels: pull-request-available, test-stability
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> See example CI run from the FLINK-34150 PR:
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56570&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3191
> {code}
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_file_sink.sh: 
> line 38: s3_setup: command not found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.18][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-05-10 Thread via GitHub


XComp merged PR #24604:
URL: https://github.com/apache/flink/pull/24604


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-05-10 Thread via GitHub


XComp merged PR #24605:
URL: https://github.com/apache/flink/pull/24605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]

2024-05-10 Thread via GitHub


XComp merged PR #24465:
URL: https://github.com/apache/flink/pull/24465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-10 Thread via GitHub


snuyanzin commented on PR #24659:
URL: https://github.com/apache/flink/pull/24659#issuecomment-2104083386

   Thanks for the contribution @empathy87
   thanks for the review @jeyhunkarimov 
   It looks ok to me
   I left a minor comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-05-10 Thread Flaviu Cicio (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842254#comment-17842254
 ] 

Flaviu Cicio edited comment on FLINK-33989 at 5/10/24 7:30 AM:
---

Thank you all for your involvement!
 
>From my understanding, FLINK-9528 only refers to the internal Flink changelog.
In our case, the issue is when writing to Kafka using the upsert-kafka sink.
As the connector's name suggests, the sink should produce an upsert stream. 
That is, for each update, there should be an update-after record.
Currently, the sink produces a retraction-like stream. That is, for each update 
it generates a retraction (tombstone generated from the update-before record) 
and an update-after.
 
Based on 
[FLIP-149|https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector]
 and the [mail 
thread|https://www.mail-archive.com/dev@flink.apache.org/msg42543.html], which 
stands behind this change, I didn't find any statement implying this behavior.
The only mention I found was in this GitHub 
[comment|https://github.com/apache/flink/pull/13850#discussion_r515759845].
 
Thus, I propose a fix that skips the update-before records when writing to 
Kafka using the kafka-upsert connector.
This can be achieved by rewriting this [code 
block|https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L101]
 to:
 
{code:java}
if (kind == RowKind.DELETE) { 
  valueSerialized = null; 
else if (kind == RowKind.UPDATE_BEFORE) { 
  return null; 
}
{code}


was (Author: JIRAUSER302297):
Thank you all for your involvement!
 
>From my understanding, FLINK-9528 only refers to the internal Flink changelog.
In our case, the issue seems to be when writing to Kafka using the upsert-kafka 
sink.
As the connector's name suggests, the sink should produce an upsert stream. 
That is, for each update, there should be an update-after record.
Currently, the sink produces a retraction-like stream. That is, for each update 
it generates a retraction (tombstone generated from the update-before record) 
and an update-after.
 
Based on 
[FLIP-149|https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector]
 and the [mail 
thread|https://www.mail-archive.com/dev@flink.apache.org/msg42543.html], which 
stands behind this change, I didn't find any statement implying this behavior.
The only mention I found was in this GitHub 
[comment|https://github.com/apache/flink/pull/13850#discussion_r515759845].
 
Thus, I propose a fix that skips the update-before records when writing to 
Kafka using the kafka-upsert connector.
This can be achieved by rewriting this [code 
block|https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L101]
 to:
 
{code:java}
if (kind == RowKind.DELETE) { 
  valueSerialized = null; 
else if (kind == RowKind.UPDATE_BEFORE) { 
  return null; 
}
{code}

> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> -
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Runtime
>Affects Versions: 1.17.2
>Reporter: Flaviu Cicio
>Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'input', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'output', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
>

Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]

2024-05-10 Thread via GitHub


snuyanzin commented on code in PR #24659:
URL: https://github.com/apache/flink/pull/24659#discussion_r1596379442


##
flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java:
##
@@ -86,4 +83,49 @@ public void testApplyPredicate() {
 OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, 
predicate6);
 assertThat(predicate7).hasToString(predicate8.toString());
 }
+
+@Test
+@SuppressWarnings("unchecked")
+public void testApplyPredicateReverse() {

Review Comment:
   ```suggestion
  void testApplyPredicateReverse() {
   ```
   JUnit5 doesn't need public modifiers, so it's better harden them
   could you please also do same for the test method above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] chore: fix typo [flink-kubernetes-operator]

2024-05-10 Thread via GitHub


1996fanrui commented on PR #812:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/812#issuecomment-2104048726

   This PR has been opened for 1 month, and don't response more than 2 weeks. 
   
   I close it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] chore: fix typo [flink-kubernetes-operator]

2024-05-10 Thread via GitHub


1996fanrui closed pull request #812: chore: fix typo
URL: https://github.com/apache/flink-kubernetes-operator/pull/812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]

2024-05-10 Thread via GitHub


wenbingshen commented on code in PR #827:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1596355996


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##
@@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo 
jobDetailsInfo) {
 json, slotSharingGroupIdMap, maxParallelismMap, metrics, 
finished);
 }
 
-private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, 
JobTopology topology)
-throws Exception {
+private void updateKafkaPulsarSourceMaxParallelisms(
+Context ctx, JobID jobId, JobTopology topology) throws Exception {
 try (var restClient = ctx.getRestClusterClient()) {
-var partitionRegex = 
Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
+Pattern partitionRegex =
+Pattern.compile(
+
"^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$"
++ 
"|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$");
 for (var vertexInfo : topology.getVertexInfos().values()) {
 if (vertexInfo.getInputs().isEmpty()) {
 var sourceVertex = vertexInfo.getId();
 var numPartitions =
 queryAggregatedMetricNames(restClient, jobId, 
sourceVertex).stream()
-.filter(partitionRegex.asMatchPredicate())
-.count();
+.map(

Review Comment:
   Yes, according to my search, Pulsar Source does not have metrics at 
partition level, all partition level metrics are associated with consumerId. 
Thanks for review. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >