Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#issuecomment-2105596048 Thanks @aiwenmo for reviewing, addressed review comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]
yuxiqian commented on PR #3314: URL: https://github.com/apache/flink-cdc/pull/3314#issuecomment-2105593186 @leonardBang PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][docs] Fix dead links in documentations [flink-cdc]
yuxiqian opened a new pull request, #3314: URL: https://github.com/apache/flink-cdc/pull/3314 This PR fixes dead links brought by #3310. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]
yeezychao commented on PR #1907: URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105588522 > > @yuxiqian Turning on upsert mode still fails to filter -u data. I am very confused as to why the same PR application failed to test in cdc 3.2 (flink 1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I have not found the reason yet. ![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZG Vycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs) > > @yeezychao Maybe check the output log and confirm if MySQL source actually sends any `-U` events to downstream? IIRC Flink will automatically append a `ChangelogNormalize` node to backfill missing update before events if source doesn't provide it. You are right!The `ChangelogNormalize` node is indeed added under the Flink 1.18 version,but the Flink 1.15 version haven't. ![image](https://github.com/apache/flink-cdc/assets/18387619/f25a44fe-df3a-43d6-b48e-98fc1be487d8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-28867) Parquet reader support nested type in array/map type
[ https://issues.apache.org/jira/browse/FLINK-28867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845539#comment-17845539 ] Xingcan Cui commented on FLINK-28867: - Hey [~jark], any plan to improve this in the near future? I feel that this is a blocker for Flink OLAP despite the data lake projects having their data readers/writers. Sometimes users would like to use Flink to process some raw parquet files. > Parquet reader support nested type in array/map type > > > Key: FLINK-28867 > URL: https://issues.apache.org/jira/browse/FLINK-28867 > Project: Flink > Issue Type: Sub-task > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.16.0 >Reporter: dalongliu >Priority: Major > Attachments: ReadParquetArray1.java, part-00121.parquet > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
yuxiqian commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105586766 @leonardBang Seems I missed some `{{ref}}` hyperlinks. Will fix it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105584829 @yuxiqian Will appreciate if you can also open PR for release-3.0 and release-3.1 branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang merged PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105580668 > @leonardBang Sure, reordered sidebar items based on DB-Engine Rankings. Thanks for the update and basing on DB-Engine Ranking makes sense to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
yuxiqian commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105577742 @leonardBang Sure, reordered sidebar items based on DB-Engine Rankings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -221,73 +240,67 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws return tableInfo; } -private void transformSchema(TableId tableId, Schema schema) throws Exception { -for (Tuple4, Optional, Boolean> -transform : transforms) { -Selectors selectors = transform.f0; -if (selectors.isMatch(tableId) && transform.f1.isPresent()) { -TransformProjection transformProjection = transform.f1.get(); +private Schema transformSchema(TableId tableId, Schema schema) throws Exception { +List newSchemas = new ArrayList<>(); +for (PostTransformers transform : transforms) { +Selectors selectors = transform.getSelectors(); +if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { +TransformProjection transformProjection = transform.getProjection().get(); +TransformFilter transformFilter = transform.getFilter().orElse(null); if (transformProjection.isValid()) { if (!transformProjectionProcessorMap.containsKey(transformProjection)) { transformProjectionProcessorMap.put( transformProjection, - TransformProjectionProcessor.of(transformProjection)); +PostTransformProcessor.of(transformProjection, transformFilter)); } -TransformProjectionProcessor transformProjectionProcessor = +PostTransformProcessor postTransformProcessor = transformProjectionProcessorMap.get(transformProjection); // update the columns of projection and add the column of projection into Schema - transformProjectionProcessor.processSchemaChangeEvent(schema); + newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema)); } } } +if (newSchemas.isEmpty()) { +return schema; +} + +Schema firstSchema = newSchemas.get(0); +newSchemas.stream() +.skip(1) +.forEach( +testSchema -> { +if (!testSchema.equals(firstSchema)) { +throw new IllegalArgumentException( +String.format( +"Incompatible transform rules result found. Inferred schema: %s and %s", +firstSchema, testSchema)); +} +}); +return firstSchema; Review Comment: Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
yuxiqian commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597350940 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ## @@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String filterExpression) { StringBuilder statement = new StringBuilder(); statement.append("SELECT * FROM "); statement.append(DEFAULT_TABLE); -if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) { +if (!isNullOrWhitespaceOnly(filterExpression)) { statement.append(" WHERE "); statement.append(filterExpression); } return parseSelect(statement.toString()); } + +public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) { +if (sqlNode instanceof SqlBasicCall) { +SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; Review Comment: Replace `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ## @@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String filterExpression) { StringBuilder statement = new StringBuilder(); statement.append("SELECT * FROM "); statement.append(DEFAULT_TABLE); -if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) { +if (!isNullOrWhitespaceOnly(filterExpression)) { statement.append(" WHERE "); statement.append(filterExpression); } return parseSelect(statement.toString()); } + +public static SqlNode rewriteExpression(SqlNode sqlNode, Map replaceMap) { +if (sqlNode instanceof SqlBasicCall) { +SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode; Review Comment: Check `SqlCall` instead of `SqlBaseCall` to cover `SqlCase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]
yuxiqian commented on PR #1907: URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105530310 > @yuxiqian Turning on upsert mode still fails to filter -u data. I am very confused as to why the same PR application failed to test in cdc 3.2 (flink 1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I have not found the reason yet. ![image](https://private-user-images.githubusercontent.com/18387619/329751040-32609ad6-9730-4bf5-83c6-91f1343dc617.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTU0MDA1MjAsIm5iZiI6MTcxNTQwMDIyMCwicGF0aCI6Ii8xODM4NzYxOS8zMjk3NTEwNDAtMzI2MDlhZDYtOTczMC00YmY1LTgzYzYtOTFmMTM0M2RjNjE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MTElMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTExVDA0MDM0MFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI1NjQzNjYzN2I2YTU3NzAwYjA0ZDkzODIxMTU0ZTc2ZjQ1YmFiYmZmOTViNzk1MDNmNzBmNDkzYjIwMmIwNGUmWC1BbXotU2lnbmVkSGVhZGVy cz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.kfPitV5gbKlL5u6DiGPoxjmPCZUsBpg62S5dTlOigrs) Maybe checking the printed log and confirm if MySQL source actually sends any `-U` events to downstream? IIRC Flink will automatically append a `ChangelogNormalize` node to backfill missing update before events if source doesn't provide it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35313][mysql-cdc] Add upsert changelog mode to avoid UPDATE_BEFORE records … [flink-cdc]
yeezychao commented on PR #1907: URL: https://github.com/apache/flink-cdc/pull/1907#issuecomment-2105523327 @yuxiqian Turning on upsert mode still fails to filter -u data. I am very confused as to why the same PR application failed to test in cdc 3.2 (flink 1.18) version, but it still works in version 2.2 (flink 1.15). Unfortunately, I have not found the reason yet. ![image](https://github.com/apache/flink-cdc/assets/18387619/32609ad6-9730-4bf5-83c6-91f1343dc617) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18
[ https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845529#comment-17845529 ] Hang Ruan edited comment on FLINK-35109 at 5/11/24 3:26 AM: I would like to help it. was (Author: ruanhang1993): I would like to help. Please assign this to me, thanks~ > Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support > for 1.17 and 1.18 > --- > > Key: FLINK-35109 > URL: https://issues.apache.org/jira/browse/FLINK-35109 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Fabian Paul >Priority: Blocker > Fix For: kafka-4.0.0 > > > The Flink Kafka connector currently can't compile against Flink > 1.20-SNAPSHOT. An example failure can be found at > https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169 > The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, > see FLINK-32455. See also specifically the comment in > https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785 > Next to that, there's also FLINK-25509 which can only be supported with Flink > 1.19 and higher. > So we should: > * Drop support for 1.17 and 1.18 > * Refactor the Flink Kafka connector to use the new > {code:java}MigrationTest{code} > We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; > this change will be a new v4.0 version with support for Flink 1.19 and the > upcoming Flink 1.20 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18
[ https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845529#comment-17845529 ] Hang Ruan commented on FLINK-35109: --- I would like to help. Please assign this to me, thanks~ > Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support > for 1.17 and 1.18 > --- > > Key: FLINK-35109 > URL: https://issues.apache.org/jira/browse/FLINK-35109 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Fabian Paul >Priority: Blocker > Fix For: kafka-4.0.0 > > > The Flink Kafka connector currently can't compile against Flink > 1.20-SNAPSHOT. An example failure can be found at > https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169 > The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, > see FLINK-32455. See also specifically the comment in > https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785 > Next to that, there's also FLINK-25509 which can only be supported with Flink > 1.19 and higher. > So we should: > * Drop support for 1.17 and 1.18 > * Refactor the Flink Kafka connector to use the new > {code:java}MigrationTest{code} > We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; > this change will be a new v4.0 version with support for Flink 1.19 and the > upcoming Flink 1.20 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35112) Membership for Row class does not include field names
[ https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845526#comment-17845526 ] Dian Fu edited comment on FLINK-35112 at 5/11/24 2:56 AM: -- Fix in - master via b4d71144de8e3772257804b6ed8ad688076430d6 - release-1.19 via d16b20e4fb4fb906927188ca11af599edd0953c1 was (Author: dianfu): Fix in master via b4d71144de8e3772257804b6ed8ad688076430d6 > Membership for Row class does not include field names > - > > Key: FLINK-35112 > URL: https://issues.apache.org/jira/browse/FLINK-35112 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Assignee: Wouter Zorgdrager >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > In the Row class in PyFlink I cannot do a membership check for field names. > This minimal example will show the unexpected behavior: > ``` > from pyflink.common import Row > row = Row(name="Alice", age=11) > # Expected to be True, but is False > print("name" in row) > person = Row("name", "age") > # This is True, as expected > print('name' in person) > ``` > The related code in the Row class is: > ``` > def __contains__(self, item): > return item in self._values > ``` > It should be relatively easy to fix with the following code: > ``` > def __contains__(self, item): > if hasattr(self, "_fields"): > return item in self._fields > else: > return item in self._values > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35112) Membership for Row class does not include field names
[ https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-35112: Fix Version/s: 1.19.1 > Membership for Row class does not include field names > - > > Key: FLINK-35112 > URL: https://issues.apache.org/jira/browse/FLINK-35112 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Assignee: Wouter Zorgdrager >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > In the Row class in PyFlink I cannot do a membership check for field names. > This minimal example will show the unexpected behavior: > ``` > from pyflink.common import Row > row = Row(name="Alice", age=11) > # Expected to be True, but is False > print("name" in row) > person = Row("name", "age") > # This is True, as expected > print('name' in person) > ``` > The related code in the Row class is: > ``` > def __contains__(self, item): > return item in self._values > ``` > It should be relatively easy to fix with the following code: > ``` > def __contains__(self, item): > if hasattr(self, "_fields"): > return item in self._fields > else: > return item in self._values > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35112) Membership for Row class does not include field names
[ https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-35112. --- Fix Version/s: 1.20.0 Resolution: Fixed Fix in master via b4d71144de8e3772257804b6ed8ad688076430d6 > Membership for Row class does not include field names > - > > Key: FLINK-35112 > URL: https://issues.apache.org/jira/browse/FLINK-35112 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Assignee: Wouter Zorgdrager >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > > In the Row class in PyFlink I cannot do a membership check for field names. > This minimal example will show the unexpected behavior: > ``` > from pyflink.common import Row > row = Row(name="Alice", age=11) > # Expected to be True, but is False > print("name" in row) > person = Row("name", "age") > # This is True, as expected > print('name' in person) > ``` > The related code in the Row class is: > ``` > def __contains__(self, item): > return item in self._values > ``` > It should be relatively easy to fix with the following code: > ``` > def __contains__(self, item): > if hasattr(self, "_fields"): > return item in self._fields > else: > return item in self._values > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35112) Membership for Row class does not include field names
[ https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-35112: --- Assignee: Wouter Zorgdrager > Membership for Row class does not include field names > - > > Key: FLINK-35112 > URL: https://issues.apache.org/jira/browse/FLINK-35112 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Assignee: Wouter Zorgdrager >Priority: Minor > Labels: pull-request-available > > In the Row class in PyFlink I cannot do a membership check for field names. > This minimal example will show the unexpected behavior: > ``` > from pyflink.common import Row > row = Row(name="Alice", age=11) > # Expected to be True, but is False > print("name" in row) > person = Row("name", "age") > # This is True, as expected > print('name' in person) > ``` > The related code in the Row class is: > ``` > def __contains__(self, item): > return item in self._values > ``` > It should be relatively easy to fix with the following code: > ``` > def __contains__(self, item): > if hasattr(self, "_fields"): > return item in self._fields > else: > return item in self._values > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]
dianfu merged PR #24756: URL: https://github.com/apache/flink/pull/24756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35272][cdc][runtime] Transform projection & filter feature overhaul [flink-cdc]
aiwenmo commented on code in PR #3285: URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597329925 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -362,25 +376,51 @@ private Optional processFilter( } private Optional processProjection( -TransformProjectionProcessor transformProjectionProcessor, +PostTransformProcessor postTransformProcessor, DataChangeEvent dataChangeEvent, -long epochTime) -throws Exception { +long epochTime) { BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); if (before != null) { BinaryRecordData projectedBefore = -transformProjectionProcessor.processData(before, epochTime); +postTransformProcessor.processData(before, epochTime); +dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); +} +if (after != null) { +BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime); +dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); +} +return Optional.of(dataChangeEvent); +} + +private Optional processPostProjection( +TableInfo tableInfo, DataChangeEvent dataChangeEvent) throws Exception { +BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); +BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); +if (before != null) { +BinaryRecordData projectedBefore = projectRecord(tableInfo, before); dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); } if (after != null) { -BinaryRecordData projectedAfter = -transformProjectionProcessor.processData(after, epochTime); +BinaryRecordData projectedAfter = projectRecord(tableInfo, after); dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); } return Optional.of(dataChangeEvent); } +private BinaryRecordData projectRecord(TableInfo tableInfo, BinaryRecordData recordData) { +List valueList = new ArrayList<>(); +RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters(); + +for (RecordData.FieldGetter fieldGetter : fieldGetters) { +valueList.add(fieldGetter.getFieldOrNull(recordData)); +} + +return tableInfo +.getRecordDataGenerator() +.generate(valueList.toArray(new Object[valueList.size()])); +} + private boolean containFilteredComputedColumn(String projection, String filter) { boolean contain = false; Review Comment: Due to the filter always executing before the projection, this method is no longer useful. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.transform; + +import org.apache.flink.cdc.common.schema.Selectors; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** Transformation rules used by {@link PostTransformOperator}. */ +public class PostTransformers { +private final Selectors selectors; + +private final Optional projection; +private final Optional filter; + +private final boolean containFilteredComputedColumn; + Review Comment: Due to the filter always executing before the projection, this private variable is no longer useful. ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java: ## @@ -221,73 +240,67 @@ private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws return tableInfo; } -private void transformSchema(TableId tableId, Schema schema) throws Exception
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597329094 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamily.C
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); Review Comment: I think the WHERE clause could be better formatted on a new line ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableSt
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
leonardBang commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2105456881 Thanks @yuxiqian for the contribution, could we put the pipeline-connectors to the first place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Shutdown StateExecutors when ForStKeyedStateBackend is disposed [flink]
fredia commented on code in PR #24768: URL: https://github.com/apache/flink/pull/24768#discussion_r1597330403 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java: ## @@ -147,15 +160,30 @@ public S createState(@Nonnull StateDescriptor stateDes @Override @Nonnull public StateExecutor createStateExecutor() { -// TODO: Make io parallelism configurable -return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +synchronized (lock) { +if (disposed) { +throw new FlinkRuntimeException( +"Attempt to create StateExecutor after ForStKeyedStateBackend is disposed."); +} +// TODO: Make io parallelism configurable +StateExecutor stateExecutor = +new ForStStateExecutor(4, db, optionsContainer.getWriteOptions()); +managedStateExecutors.add(stateExecutor); +return stateExecutor; +} } /** Should only be called by one thread, and only after all accesses to the DB happened. */ @Override public void dispose() { -if (this.disposed) { -return; +synchronized (lock) { +if (disposed) { +return; +} +disposed = true; +for (StateExecutor executor : managedStateExecutors) { Review Comment: Can this `for-loop` be placed outside the lock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui commented on code in PR #24770: URL: https://github.com/apache/flink/pull/24770#discussion_r1597329545 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java: ## @@ -343,7 +343,11 @@ private static StreamStateHandle createDummySegmentFileStateHandle(Random rnd) { private static StreamStateHandle createDummySegmentFileStateHandle( Random rnd, boolean isEmpty) { return isEmpty -? TestingSegmentFileStateHandle.EMPTY_INSTANCE +? new TestingSegmentFileStateHandle( +new Path(UUID.randomUUID().toString()), Review Comment: 2 tests[1] failed, I changed the path from `empty` to random string to fix them. ## Reason: SharedStateRegistryImpl#registerReference uses `registrationKey` to check whether StreamStateHandle is same. The `registrationKey` is generated from path name. So all `empty` string generates the same `registrationKey`, but StreamStateHandle are not same. ## My question: Is there strong reason to use `empty` as the path name? If no, current change is fine. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59448&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=24c3384f-1bcb-57b3-224f-51bf973bbee8&l=9594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35323][runtime] Fix transform failure when one rule matches multiple tables with incompatible schema [flink-cdc]
yuxiqian opened a new pull request, #3313: URL: https://github.com/apache/flink-cdc/pull/3313 This closes [FLINK-35323](https://issues.apache.org/jira/browse/FLINK-35323). Currently, Transform module doesn’t support handling multiple tables with different schema within one projection / filtering rule. A minimum reproducible example is: ```yaml transform: - source-table: DB.TABLE\.* projection: \* ``` If there are `DB.TABLE1` and `DB.TABLE2` in upstream source but with different schema, transform operator will panic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597276173 ## flink-connector-aws/flink-connector-sqs/archunit-violations/stored.rules: ## @@ -0,0 +1,4 @@ +# +#Tue Feb 22 12:19:26 CET 2022 +Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=54da9a7d-14d2-4632-a045-1dd8fc665c8f Review Comment: I am not sure if these auto-generated files should be added to PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35305) FLIP-438: Amazon SQS Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-35305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35305: --- Labels: pull-request-available (was: ) > FLIP-438: Amazon SQS Sink Connector > --- > > Key: FLINK-35305 > URL: https://issues.apache.org/jira/browse/FLINK-35305 > Project: Flink > Issue Type: New Feature > Components: Connectors / AWS >Reporter: Priya Dhingra >Priority: Major > Labels: pull-request-available > > This is an umbrella task for FLIP-438. FLIP-438: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]
boring-cyborg[bot] commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2105339886 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra opened a new pull request, #141: URL: https://github.com/apache/flink-connector-aws/pull/141 ## Purpose of the change Implements the Amazon SQS Sink Connector ## Verifying this change This change added tests and can be verified as follows: Added unit tests Added integration tests Manually verified by running the SQS connector on a local Flink cluster. ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [x] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
snuyanzin commented on PR #24526: URL: https://github.com/apache/flink/pull/24526#issuecomment-2105169563 May be it is an unpopular opinion however I tend to think that `INTERSECT` vs `INTERSECT ALL` and the same for others for set and bag semantics is defined for rows and hardly could be applied for collections. I failed to find standard approach for collections like arrays (I mean in SQL Stadard). Probably that is one of the reasons we could see a number of vendors are handling this differently. From one side we could say that we should follow the same approach as for rows. The problem I see here is that by default we will remove duplicates however what should we do if we want to keep them? There is no well known vendor providing both `array_intersect` and `array_intersect_all` or keep/remove duplicates as a parameter. At the same side if we keep duplicates then we will still be able to cover both cases: we can do for case with duplicates ``` array_intersect(array1, array2) ``` and for case without ``` array_distinct(array_intersect(array1, array2)) ``` Yes, `array_union` looks like an exception here, however if we compare against vendors then it is a global exception just because there is a nice synonym which is used for another function `array_concat`. So if we want to concat arrays without duplicates we use `array_union` and then `array_concat`. The problem is that not every function has such a workaround . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]
snuyanzin commented on PR #26: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104973419 @vahmed-hamdy thanks for the contribution could you please schedule a weekly job on your fork to be sure it passes and put a link here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Flink 33461 [flink-connector-jdbc]
RocMarshal closed pull request #119: Flink 33461 URL: https://github.com/apache/flink-connector-jdbc/pull/119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown
[ https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-35318: - Assignee: linshangquan (was: Jane Chan) > incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during > predicate pushdown > - > > Key: FLINK-35318 > URL: https://issues.apache.org/jira/browse/FLINK-35318 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1 > Environment: flink version 1.18.1 > iceberg version 1.15.1 >Reporter: linshangquan >Assignee: linshangquan >Priority: Major > Attachments: image-2024-05-09-14-06-58-007.png, > image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, > image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, > image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png > > > In our scenario, we have an Iceberg table that contains a column named 'time' > of the {{timestamptz}} data type. This column has 10 rows of data where the > 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" > timezone. > !image-2024-05-09-14-06-58-007.png! > > We encountered a strange phenomenon when accessing the table using > Iceberg-flink. > When the {{WHERE}} clause includes the {{time}} column, the results are > incorrect. > ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" > !image-2024-05-09-18-52-03-741.png! > When there is no {{WHERE}} clause, the results are correct. > !image-2024-05-09-18-52-28-584.png! > During debugging, we found that when a {{WHERE}} clause is present, a > {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes > {{RexNodeToExpressionConverter}} for translation. > !image-2024-05-09-14-11-38-476.png! > !image-2024-05-09-14-22-59-370.png! > When {{RexNodeToExpressionConverter#visitLiteral}} encounters a > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone > "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} > type. However, the upstream {{TimestampString}} data has already been > processed in UTC timezone. By applying the local timezone processing here, an > error occurs due to the mismatch in timezones. > Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in > {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should > process the data in UTC timezone. > > Please help confirm if this is the issue, and if so, we can submit a patch to > fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown
[ https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-35318: - Assignee: Jane Chan > incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during > predicate pushdown > - > > Key: FLINK-35318 > URL: https://issues.apache.org/jira/browse/FLINK-35318 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1 > Environment: flink version 1.18.1 > iceberg version 1.15.1 >Reporter: linshangquan >Assignee: Jane Chan >Priority: Major > Attachments: image-2024-05-09-14-06-58-007.png, > image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, > image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, > image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png > > > In our scenario, we have an Iceberg table that contains a column named 'time' > of the {{timestamptz}} data type. This column has 10 rows of data where the > 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" > timezone. > !image-2024-05-09-14-06-58-007.png! > > We encountered a strange phenomenon when accessing the table using > Iceberg-flink. > When the {{WHERE}} clause includes the {{time}} column, the results are > incorrect. > ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" > !image-2024-05-09-18-52-03-741.png! > When there is no {{WHERE}} clause, the results are correct. > !image-2024-05-09-18-52-28-584.png! > During debugging, we found that when a {{WHERE}} clause is present, a > {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes > {{RexNodeToExpressionConverter}} for translation. > !image-2024-05-09-14-11-38-476.png! > !image-2024-05-09-14-22-59-370.png! > When {{RexNodeToExpressionConverter#visitLiteral}} encounters a > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone > "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} > type. However, the upstream {{TimestampString}} data has already been > processed in UTC timezone. By applying the local timezone processing here, an > error occurs due to the mismatch in timezones. > Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in > {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should > process the data in UTC timezone. > > Please help confirm if this is the issue, and if so, we can submit a patch to > fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35318) incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during predicate pushdown
[ https://issues.apache.org/jira/browse/FLINK-35318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845408#comment-17845408 ] Jane Chan commented on FLINK-35318: --- Hi [~linshangquan], thanks for reporting this issue. Your understanding is correct. RexNodeToExpressionConverter#visitLiteral should not convert the literal to UTC again since this has been done before at the SQL to Rel phase. > incorrect timezone handling for TIMESTAMP_WITH_LOCAL_TIME_ZONE type during > predicate pushdown > - > > Key: FLINK-35318 > URL: https://issues.apache.org/jira/browse/FLINK-35318 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.18.1 > Environment: flink version 1.18.1 > iceberg version 1.15.1 >Reporter: linshangquan >Priority: Major > Attachments: image-2024-05-09-14-06-58-007.png, > image-2024-05-09-14-09-38-453.png, image-2024-05-09-14-11-38-476.png, > image-2024-05-09-14-22-14-417.png, image-2024-05-09-14-22-59-370.png, > image-2024-05-09-18-52-03-741.png, image-2024-05-09-18-52-28-584.png > > > In our scenario, we have an Iceberg table that contains a column named 'time' > of the {{timestamptz}} data type. This column has 10 rows of data where the > 'time' value is {{'2024-04-30 07:00:00'}} expressed in the "Asia/Shanghai" > timezone. > !image-2024-05-09-14-06-58-007.png! > > We encountered a strange phenomenon when accessing the table using > Iceberg-flink. > When the {{WHERE}} clause includes the {{time}} column, the results are > incorrect. > ZoneId.{_}systemDefault{_}() = "Asia/Shanghai" > !image-2024-05-09-18-52-03-741.png! > When there is no {{WHERE}} clause, the results are correct. > !image-2024-05-09-18-52-28-584.png! > During debugging, we found that when a {{WHERE}} clause is present, a > {{FilterPushDownSpec}} is generated, and this {{FilterPushDownSpec}} utilizes > {{RexNodeToExpressionConverter}} for translation. > !image-2024-05-09-14-11-38-476.png! > !image-2024-05-09-14-22-59-370.png! > When {{RexNodeToExpressionConverter#visitLiteral}} encounters a > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type, it uses the specified timezone > "Asia/Shanghai" to convert the {{TimestampString}} type to an {{Instant}} > type. However, the upstream {{TimestampString}} data has already been > processed in UTC timezone. By applying the local timezone processing here, an > error occurs due to the mismatch in timezones. > Whether the handling of {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} type of data in > {{RexNodeToExpressionConverter#visitLiteral}} is a bug, and whether it should > process the data in UTC timezone. > > Please help confirm if this is the issue, and if so, we can submit a patch to > fix it. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35327) SQL Explain show push down condition
[ https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845395#comment-17845395 ] lincoln lee commented on FLINK-35327: - Hi [~loserwang1024], thanks for reporting this! Assigned to you [~xuyangzhong]. > SQL Explain show push down condition > - > > Key: FLINK-35327 > URL: https://issues.apache.org/jira/browse/FLINK-35327 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Hongshun Wang >Assignee: xuyang >Priority: Minor > Fix For: 1.20.0 > > > Current, we can not determine whether filter/limit/partition condition is > pushed down to source. For example, we can only know filter condition is > pushed down if it is not included in Filter any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35327) SQL Explain show push down condition
[ https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-35327: --- Assignee: xuyang > SQL Explain show push down condition > - > > Key: FLINK-35327 > URL: https://issues.apache.org/jira/browse/FLINK-35327 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Hongshun Wang >Assignee: xuyang >Priority: Minor > Fix For: 1.20.0 > > > Current, we can not determine whether filter/limit/partition condition is > pushed down to source. For example, we can only know filter condition is > pushed down if it is not included in Filter any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33759] flink parquet writer support write nested array or map type [flink]
xccui commented on PR #23881: URL: https://github.com/apache/flink/pull/23881#issuecomment-2104688811 Hi @LoveHeat, can you cherry-pick the change in https://github.com/apache/flink/pull/24029 from @ukby1234 and merge the two PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on PR #24760: URL: https://github.com/apache/flink/pull/24760#issuecomment-2104649932 Hi, @lsyldliu all comments have been addressed. Can you take a look again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1596779798 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } +@Test +void testAlterMaterializedTableRefresh() throws Exception { +// initialize session handle, create test-filesystem catalog and register it to catalog +// store +SessionHandle sessionHandle = initializeSession(); + +String materializedTableDDL = +"CREATE MATERIALIZED TABLE users_shops" ++ " PARTITIONED BY (ds)\n" ++ " WITH(\n" ++ " 'format' = 'debezium-json'\n" ++ " )\n" ++ " FRESHNESS = INTERVAL '30' SECOND\n" ++ " AS SELECT \n" ++ " user_id,\n" ++ " shop_id,\n" ++ " ds,\n" Review Comment: use values source to check data overwrite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
empathy87 commented on PR #24659: URL: https://github.com/apache/flink/pull/24659#issuecomment-2104557693 > Thanks, let's wait for ci > > btw, @empathy87 could you please create backport PRs with this fix for 1.19 and 1.18? Yeh, sure, I will do backports! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35323) Only the schema of the first hit table is recorded when the source-table of the transformer hits multiple tables
[ https://issues.apache.org/jira/browse/FLINK-35323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35323: --- Labels: pull-request-available (was: ) > Only the schema of the first hit table is recorded when the source-table of > the transformer hits multiple tables > > > Key: FLINK-35323 > URL: https://issues.apache.org/jira/browse/FLINK-35323 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: 3.1.0 >Reporter: Wenkai Qi >Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > {code:java} > transform: > - source-table: mydb.web_\.* > projection: \*, localtimestamp as new_timestamp > description: project fields from source table {code} > Table mydb.web_order: col1, col2, col3 > Table mydb.web_info: col1, col4 > If transform data operator processes `mydb.web_info` first and then > `mydb.web_order`, its schema will always be `col1, col4`. > Cause by: TransformDataOperator.java > {code:java} > private transient Map > transformProjectionProcessorMap; > private transient Map > transformFilterProcessorMap; {code} > The relationship of `TableId` is missing here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35323][runtime] Fix transform failure when one rule matches multiple schemas [flink-cdc]
yuxiqian commented on PR #3312: URL: https://github.com/apache/flink-cdc/pull/3312#issuecomment-2104532312 cc @aiwenmo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35302][rest] Ignore unknown fields in REST request deserialization [flink]
gaborgsomogyi commented on PR #24759: URL: https://github.com/apache/flink/pull/24759#issuecomment-2104485093 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31664] Implement ARRAY_INTERSECT function [flink]
liuyongvs commented on PR #24526: URL: https://github.com/apache/flink/pull/24526#issuecomment-2104468195 hi @dawidwys @snuyanzin WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][tests] Fix occasional pipeline E2e testcases failure [flink-cdc]
lvyanquan commented on PR #3309: URL: https://github.com/apache/flink-cdc/pull/3309#issuecomment-2104448042 LGTM, waiting for CI pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-35041: --- Assignee: Rui Fan (was: Feifan Wang) > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35030][runtime] Introduce Epoch Manager under async execution [flink]
fredia commented on code in PR #24748: URL: https://github.com/apache/flink/pull/24748#discussion_r1596619045 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/EpochManager.java: ## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import javax.annotation.Nullable; + +import java.util.LinkedList; + +/** + * Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g. + * watermark, record attributes). Records are assigned to a unique epoch based on their arrival, + * records within an epoch are allowed to be parallelized, while the non-record of an epoch can only + * be executed when all records in this epoch have finished. + * + * For more details please refer to FLIP-425. + */ +public class EpochManager { + +/** + * This enum defines whether parallel execution between epochs is allowed. We should keep this + * internal and away from API module for now, until we could see the concrete need for {@link + * #PARALLEL_BETWEEN_EPOCH} from average users. + */ +public enum ParallelMode { +/** + * Subsequent epochs must wait until the previous epoch is completed before they can start. + */ +SERIAL_BETWEEN_EPOCH, +/** + * Subsequent epochs can begin execution even if the previous epoch has not yet completed. + * Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}. + */ +PARALLEL_BETWEEN_EPOCH +} + +/** + * The reference to the {@link AsyncExecutionController}, used for {@link + * ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing. + */ +final AsyncExecutionController asyncExecutionController; + +/** The number of epochs that have arrived. */ +long epochNum; + +/** The output queue to hold ongoing epochs. */ +LinkedList outputQueue; + +/** Current active epoch, only one active epoch at the same time. */ +Epoch activeEpoch; + +public EpochManager(AsyncExecutionController aec) { +this.epochNum = 0; +this.outputQueue = new LinkedList<>(); +// preset an empty epoch, the epoch action will be updated when non-record is received. +this.activeEpoch = new Epoch(epochNum++); +this.outputQueue.add(activeEpoch); +this.asyncExecutionController = aec; +} + +/** + * Add a record to the current epoch and return the current open epoch, the epoch will be + * associated with the {@link RecordContext} of this record. Must be invoked within task thread. + * + * @return the current open epoch. + */ +public Epoch onRecord() { +activeEpoch.ongoingRecordCount++; +return activeEpoch; +} + +/** + * Add a non-record to the current epoch, close current epoch and open a new epoch. Must be + * invoked within task thread. + * + * @param action the action associated with this non-record. + * @param parallelMode the parallel mode for this epoch. + */ +public void onNonRecord(Runnable action, ParallelMode parallelMode) { Review Comment: Because there is no exception in the signature of `disposeContext()` , if we change Runnable to ThrowingRunnable here, we need to catch the exception in `aec.disposeContext()` or `epochManager.completeOneRecord()`. I think that catching exceptions in `StreamOperator` is more convenient ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -311,6 +318,10 @@ public void drainInflightRecords(int targetNum) { } } +public EpochManager getEpochManager() { +return epochManager; Review Comment: 👍Thanks for the suggestion, I added `processNonRecord()` for aec, `EpochManager` is not visible for operators now. ## flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/EpochManagerTest.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work
Re: [PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
yuxiqian commented on PR #3310: URL: https://github.com/apache/flink-cdc/pull/3310#issuecomment-2104414800 @leonardBang @GOODBOY008 PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
flinkbot commented on PR #24770: URL: https://github.com/apache/flink/pull/24770#issuecomment-2104413730 ## CI report: * 82bb2cc9da16f2a3b7c1293c78412d503ca3ea80 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui commented on PR #24770: URL: https://github.com/apache/flink/pull/24770#issuecomment-2104409349 Hi @masteryhx @Zakelly , would you mind helping review this PR in your free time? It's a blocker of flink 1.20, and IIUC it's caused by https://github.com/apache/flink/pull/24480. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35041: --- Labels: pull-request-available (was: ) > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Feifan Wang >Priority: Blocker > Labels: pull-request-available > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed [flink]
1996fanrui opened a new pull request, #24770: URL: https://github.com/apache/flink/pull/24770 ## What is the purpose of the change TestingSegmentFileStateHandle.EMPTY_INSTANCE is static, it means other test can update disposed. TestingSegmentFileStateHandle object shouldn't be shared in global scope. ## Brief change log [FLINK-35041][test] Fix the IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #26: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104372982 @snuyanzin Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35322][Connectors/Google PubSub] Remove weekly tests of 1.19 for unsupporting version v3.0 [flink-connector-gcp-pubsub]
boring-cyborg[bot] commented on PR #26: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/26#issuecomment-2104371048 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35322) PubSub Connector Weekly build fails
[ https://issues.apache.org/jira/browse/FLINK-35322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35322: --- Labels: pull-request-available test-stability (was: test-stability) > PubSub Connector Weekly build fails > > > Key: FLINK-35322 > URL: https://issues.apache.org/jira/browse/FLINK-35322 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub >Affects Versions: 3.1.0 >Reporter: Ahmed Hamdy >Priority: Major > Labels: pull-request-available, test-stability > > Weekly builds for GCP pubSub connector is failing for 1.19 due to compilation > error in tests. > https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8768752932/job/24063472769 > https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8863605354 > https://github.com/apache/flink-connector-gcp-pubsub/actions/runs/8954270618 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35293][hive] Hive source supports dynamic parallelism inference [flink]
SinBex commented on code in PR #24764: URL: https://github.com/apache/flink/pull/24764#discussion_r1596561578 ## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveDynamicParallelismInferenceFactory.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.hive; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.table.catalog.ObjectPath; + +import org.apache.hadoop.mapred.JobConf; + +/** + * The factory class for {@link HiveParallelismInference} to support Hive source dynamic parallelism + * inference. + */ +class HiveDynamicParallelismInferenceFactory implements HiveParallelismInference.Provider { + +private final ObjectPath tablePath; +private final JobConf jobConf; +private final int globalMaxParallelism; + +HiveDynamicParallelismInferenceFactory( +ObjectPath tablePath, JobConf jobConf, int globalMaxParallelism) { +this.tablePath = tablePath; +this.jobConf = jobConf; +this.globalMaxParallelism = globalMaxParallelism; +} + +@Override +public HiveParallelismInference create() { +boolean inferEnabled = +jobConf.getBoolean( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM.defaultValue()); +HiveOptions.InferMode inferMode = +jobConf.getEnum( + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.key(), + HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MODE.defaultValue()); Review Comment: I overlooked this issue during testing. Thank you very much for pointing it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35306) Flink cannot compile with jdk17
[ https://issues.apache.org/jira/browse/FLINK-35306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845275#comment-17845275 ] Ryan Skraba commented on FLINK-35306: - Thanks for the quick fix! Just to document this, we saw the compilation fail on GitHub Actions too: * 1.20 Java 17 / Compile https://github.com/apache/flink/commit/29736b8c01924b7da03d4bcbfd9c812a8e5a08b4/checks/24709533133/logs > Flink cannot compile with jdk17 > --- > > Key: FLINK-35306 > URL: https://issues.apache.org/jira/browse/FLINK-35306 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Tests >Affects Versions: 1.20.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Blocker > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-05-08-11-48-04-161.png > > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59380&view=results] > fails and benchmark with 17 fails as well > > Reason: TypeSerializerUpgradeTestBase.UpgradeVerifier update the > schemaCompatibilityMatcher method name to schemaCompatibilityCondition, but > some subclasses didn't change it, such as: > PojoRecordSerializerUpgradeTestSpecifications.PojoToRecordVerifier. > > It belongs to flink-tests-java17 module, and it doesn't compile by default. > > it's caused by > * https://issues.apache.org/jira/browse/FLINK-25537 > * [https://github.com/apache/flink/pull/24603] > > !image-2024-05-08-11-48-04-161.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
[ https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845274#comment-17845274 ] Ryan Skraba commented on FLINK-35041: - * 1.20 Hadoop 3.1.3 / Test (module: core) https://github.com/apache/flink/actions/runs/9026237714/job/24803537384#step:10:8419 * 1.20 Java 21 / Test (module: core) https://github.com/apache/flink/actions/runs/9011311875/job/24758973855#step:10:8334 * 1.20 Default (Java 8) / Test (module: core) https://github.com/apache/flink/actions/runs/8999811164/job/24723153060#step:10:8487 * 1.20 Default (Java 8) / Test (module: core) https://github.com/apache/flink/actions/runs/8997755665/job/24716975457#step:10:9046 * 1.20 Java 11 / Test (module: core) https://github.com/apache/flink/actions/runs/8995101420/job/24709819637#step:10:8738 * 1.20 Java 21 / Test (module: core) https://github.com/apache/flink/actions/runs/8995101420/job/24709801069#step:10:8940 * 1.20 Default (Java 8) / Test (module: core) https://github.com/apache/flink/actions/runs/8985327313/job/24679590248#step:10:8686 > IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed > -- > > Key: FLINK-35041 > URL: https://issues.apache.org/jira/browse/FLINK-35041 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Assignee: Feifan Wang >Priority: Blocker > > {code:java} > Apr 08 03:22:45 03:22:45.450 [ERROR] > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration > -- Time elapsed: 0.034 s <<< FAILURE! > Apr 08 03:22:45 org.opentest4j.AssertionFailedError: > Apr 08 03:22:45 > Apr 08 03:22:45 expected: false > Apr 08 03:22:45 but was: true > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > Apr 08 03:22:45 at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > Apr 08 03:22:45 at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34) > Apr 08 03:22:45 at > org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211) > Apr 08 03:22:45 at java.lang.reflect.Method.invoke(Method.java:498) > Apr 08 03:22:45 at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > Apr 08 03:22:45 at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) > {code} > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=125e07e7-8de0-5c6c-a541-a567415af3ef&l=9238] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35002) GitHub action request timeout to ArtifactService
[ https://issues.apache.org/jira/browse/FLINK-35002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845273#comment-17845273 ] Ryan Skraba commented on FLINK-35002: - * 1.19 Java 11 / Compile https://github.com/apache/flink/commit/fa426f104baa1343a07695dcf4c4984814f0fde4/checks/24803211419/logs * 1.18 Java 11 / Test (module: connect) https://github.com/apache/flink/commit/9d0858ee745bc835efa78a34d849d5f3ecb89f6d/checks/24709868165/logs > GitHub action request timeout to ArtifactService > - > > Key: FLINK-35002 > URL: https://issues.apache.org/jira/browse/FLINK-35002 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Ryan Skraba >Priority: Major > Labels: github-actions, test-stability > > A timeout can occur when uploading a successfully built artifact: > * [https://github.com/apache/flink/actions/runs/8516411871/job/23325392650] > {code:java} > 2024-04-02T02:20:15.6355368Z With the provided path, there will be 1 file > uploaded > 2024-04-02T02:20:15.6360133Z Artifact name is valid! > 2024-04-02T02:20:15.6362872Z Root directory input is valid! > 2024-04-02T02:20:20.6975036Z Attempt 1 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 3000 ms... > 2024-04-02T02:20:28.7084937Z Attempt 2 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 4785 ms... > 2024-04-02T02:20:38.5015936Z Attempt 3 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 7375 ms... > 2024-04-02T02:20:50.8901508Z Attempt 4 of 5 failed with error: Request > timeout: /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact. > Retrying request in 14988 ms... > 2024-04-02T02:21:10.9028438Z ##[error]Failed to CreateArtifact: Failed to > make request after 5 attempts: Request timeout: > /twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact > 2024-04-02T02:22:59.9893296Z Post job cleanup. > 2024-04-02T02:22:59.9958844Z Post job cleanup. {code} > (This is unlikely to be something we can fix, but we can track it.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34404) GroupWindowAggregateProcTimeRestoreTest#testRestore times out
[ https://issues.apache.org/jira/browse/FLINK-34404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845271#comment-17845271 ] Ryan Skraba commented on FLINK-34404: - (Using the same command line as above) * 1.20 Default (Java 8) / Test (module: table) https://github.com/apache/flink/actions/runs/8999811164/job/24723153970#step:10:12716 > GroupWindowAggregateProcTimeRestoreTest#testRestore times out > - > > Key: FLINK-34404 > URL: https://issues.apache.org/jira/browse/FLINK-34404 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Assignee: Alan Sheinberg >Priority: Critical > Labels: test-stability > Attachments: FLINK-34404.failure.log, FLINK-34404.success.log > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57357&view=logs&j=32715a4c-21b8-59a3-4171-744e5ab107eb&t=ff64056b-5320-5afe-c22c-6fa339e59586&l=11603 > {code} > Feb 07 02:17:40 "ForkJoinPool-74-worker-1" #382 daemon prio=5 os_prio=0 > cpu=282.22ms elapsed=961.78s tid=0x7f880a485c00 nid=0x6745 waiting on > condition [0x7f878a6f9000] > Feb 07 02:17:40java.lang.Thread.State: WAITING (parking) > Feb 07 02:17:40 at > jdk.internal.misc.Unsafe.park(java.base@17.0.7/Native Method) > Feb 07 02:17:40 - parking to wait for <0xff73d060> (a > java.util.concurrent.CompletableFuture$Signaller) > Feb 07 02:17:40 at > java.util.concurrent.locks.LockSupport.park(java.base@17.0.7/LockSupport.java:211) > Feb 07 02:17:40 at > java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.7/CompletableFuture.java:1864) > Feb 07 02:17:40 at > java.util.concurrent.ForkJoinPool.compensatedBlock(java.base@17.0.7/ForkJoinPool.java:3449) > Feb 07 02:17:40 at > java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.7/ForkJoinPool.java:3432) > Feb 07 02:17:40 at > java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.7/CompletableFuture.java:1898) > Feb 07 02:17:40 at > java.util.concurrent.CompletableFuture.get(java.base@17.0.7/CompletableFuture.java:2072) > Feb 07 02:17:40 at > org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase.testRestore(RestoreTestBase.java:292) > Feb 07 02:17:40 at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.7/Native > Method) > Feb 07 02:17:40 at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.7/NativeMethodAccessorImpl.java:77) > Feb 07 02:17:40 at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.7/DelegatingMethodAccessorImpl.java:43) > Feb 07 02:17:40 at > java.lang.reflect.Method.invoke(java.base@17.0.7/Method.java:568) > Feb 07 02:17:40 at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34273) git fetch fails
[ https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845270#comment-17845270 ] Ryan Skraba commented on FLINK-34273: - * 1.20 test_cron_hadoop313 connect https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59424&view=logs&j=b6f8a893-8f59-51d5-fe28-fb56a8b0932c&t=a2aa31b1-3076-5dd3-ea01-4a81e1467181&l=384 * 1.20 test_ci misc https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59405&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=10163a1a-ea71-5414-a832-7701bff37ba3&l=380 > git fetch fails > --- > > Key: FLINK-34273 > URL: https://issues.apache.org/jira/browse/FLINK-34273 > Project: Flink > Issue Type: Bug > Components: Build System / CI, Test Infrastructure >Affects Versions: 1.19.0, 1.18.1, 1.20.0 >Reporter: Matthias Pohl >Priority: Major > Labels: test-stability > > We've seen multiple {{git fetch}} failures. I assume this to be an > infrastructure issue. This Jira issue is for documentation purposes. > {code:java} > error: RPC failed; curl 18 transfer closed with outstanding read data > remaining > error: 5211 bytes of body are still expected > fetch-pack: unexpected disconnect while reading sideband packet > fatal: early EOF > fatal: fetch-pack: invalid index-pack output {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34224) ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest timed out
[ https://issues.apache.org/jira/browse/FLINK-34224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845269#comment-17845269 ] Ryan Skraba commented on FLINK-34224: - * 1.18 Hadoop 3.1.3 / Test (module: core) https://github.com/apache/flink/actions/runs/9011311755/job/24759083100#step:10:10641 > ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest > timed out > --- > > Key: FLINK-34224 > URL: https://issues.apache.org/jira/browse/FLINK-34224 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: github-actions, test-stability > > The timeout appeared in the GitHub Actions workflow (currently in test phase; > [FLIP-396|https://cwiki.apache.org/confluence/display/FLINK/FLIP-396%3A+Trial+to+test+GitHub+Actions+as+an+alternative+for+Flink%27s+current+Azure+CI+infrastructure]): > https://github.com/XComp/flink/actions/runs/7632434859/job/20793613726#step:10:11040 > {code} > Jan 24 01:38:36 "ForkJoinPool-1-worker-1" #16 daemon prio=5 os_prio=0 > tid=0x7f3b200ae800 nid=0x406e3 waiting on condition [0x7f3b1ba0e000] > Jan 24 01:38:36java.lang.Thread.State: WAITING (parking) > Jan 24 01:38:36 at sun.misc.Unsafe.park(Native Method) > Jan 24 01:38:36 - parking to wait for <0xdfbbb358> (a > java.util.concurrent.CompletableFuture$Signaller) > Jan 24 01:38:36 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > Jan 24 01:38:36 at > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > Jan 24 01:38:36 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Jan 24 01:38:36 at > org.apache.flink.changelog.fs.ChangelogStorageMetricsTest.testAttemptsPerUpload(ChangelogStorageMetricsTest.java:251) > Jan 24 01:38:36 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26644) python StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845268#comment-17845268 ] Ryan Skraba commented on FLINK-26644: - * 1.20 Java 11 / Test (module: python) https://github.com/apache/flink/actions/runs/9011311875/job/24758993707#step:10:25072 > python > StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies > failed on azure > --- > > Key: FLINK-26644 > URL: https://issues.apache.org/jira/browse/FLINK-26644 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.14.4, 1.15.0, 1.16.0, 1.19.0 >Reporter: Yun Gao >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > {code:java} > 2022-03-14T18:50:24.6842853Z Mar 14 18:50:24 > === FAILURES > === > 2022-03-14T18:50:24.6844089Z Mar 14 18:50:24 _ > StreamExecutionEnvironmentTests.test_generate_stream_graph_with_dependencies _ > 2022-03-14T18:50:24.6844846Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6846063Z Mar 14 18:50:24 self = > testMethod=test_generate_stream_graph_with_dependencies> > 2022-03-14T18:50:24.6847104Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6847766Z Mar 14 18:50:24 def > test_generate_stream_graph_with_dependencies(self): > 2022-03-14T18:50:24.6848677Z Mar 14 18:50:24 python_file_dir = > os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4())) > 2022-03-14T18:50:24.6849833Z Mar 14 18:50:24 os.mkdir(python_file_dir) > 2022-03-14T18:50:24.6850729Z Mar 14 18:50:24 python_file_path = > os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py") > 2022-03-14T18:50:24.6852679Z Mar 14 18:50:24 with > open(python_file_path, 'w') as f: > 2022-03-14T18:50:24.6853646Z Mar 14 18:50:24 f.write("def > add_two(a):\nreturn a + 2") > 2022-03-14T18:50:24.6854394Z Mar 14 18:50:24 env = self.env > 2022-03-14T18:50:24.6855019Z Mar 14 18:50:24 > env.add_python_file(python_file_path) > 2022-03-14T18:50:24.6855519Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6856254Z Mar 14 18:50:24 def plus_two_map(value): > 2022-03-14T18:50:24.6857045Z Mar 14 18:50:24 from > test_stream_dependency_manage_lib import add_two > 2022-03-14T18:50:24.6857865Z Mar 14 18:50:24 return value[0], > add_two(value[1]) > 2022-03-14T18:50:24.6858466Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6858924Z Mar 14 18:50:24 def add_from_file(i): > 2022-03-14T18:50:24.6859806Z Mar 14 18:50:24 with > open("data/data.txt", 'r') as f: > 2022-03-14T18:50:24.6860266Z Mar 14 18:50:24 return i[0], > i[1] + int(f.read()) > 2022-03-14T18:50:24.6860879Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6862022Z Mar 14 18:50:24 from_collection_source = > env.from_collection([('a', 0), ('b', 0), ('c', 1), ('d', 1), > 2022-03-14T18:50:24.6863259Z Mar 14 18:50:24 > ('e', 2)], > 2022-03-14T18:50:24.6864057Z Mar 14 18:50:24 > type_info=Types.ROW([Types.STRING(), > 2022-03-14T18:50:24.6864651Z Mar 14 18:50:24 > Types.INT()])) > 2022-03-14T18:50:24.6865150Z Mar 14 18:50:24 > from_collection_source.name("From Collection") > 2022-03-14T18:50:24.6866212Z Mar 14 18:50:24 keyed_stream = > from_collection_source.key_by(lambda x: x[1], key_type=Types.INT()) > 2022-03-14T18:50:24.6867083Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6867793Z Mar 14 18:50:24 plus_two_map_stream = > keyed_stream.map(plus_two_map).name("Plus Two Map").set_parallelism(3) > 2022-03-14T18:50:24.6868620Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6869412Z Mar 14 18:50:24 add_from_file_map = > plus_two_map_stream.map(add_from_file).name("Add From File Map") > 2022-03-14T18:50:24.6870239Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6870883Z Mar 14 18:50:24 test_stream_sink = > add_from_file_map.add_sink(self.test_sink).name("Test Sink") > 2022-03-14T18:50:24.6871803Z Mar 14 18:50:24 > test_stream_sink.set_parallelism(4) > 2022-03-14T18:50:24.6872291Z Mar 14 18:50:24 > 2022-03-14T18:50:24.6872756Z Mar 14 18:50:24 archive_dir_path = > os.path.join(self.tempdir, "archive_" + str(uuid.uuid4())) > 2022-03-14T18:50:24.6873557Z Mar 14 18:50:24 > os.mkdir(archive_dir_path) > 2022-03-14T18:50:24.6874817Z Mar 14 18:50:24 with > open(os.path.join(archive_dir_path, "data.txt"), 'w') as f: > 2022-03-14T18:50:24.6875414Z Mar 14 18:50:24 f.write("3") > 2022-03-14T18:50:24.68
[jira] [Commented] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator
[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845267#comment-17845267 ] zhangdingxin commented on FLINK-35237: -- If the maintainers agree that this improvement is worthwhile, I would be happy to take it on. Please feel free to assign the issue to me. > Allow Sink to Choose HashFunction in PrePartitionOperator > - > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement their PartitionOperator, which undermines the utility of > {{{}PrePartitionOperator{}}}. > To address this limitation, it would be highly desirable to enable the > {{PrePartitionOperator}} to support user-specified custom > {{{}HashFunction{}}}s (Function). A possible > solution could involve a mechanism analogous to the {{DataSink}} interface, > allowing the specification of a {{HashFunctionProvider}} class path in the > configuration file. This enhancement would greatly facilitate users in > tailoring partition strategies to meet their specific application needs. > In this case, I want to create new class {{HashFunctionProvider}} and > {{{}HashFunction{}}}: > {code:java} > public interface HashFunctionProvider { > HashFunction getHashFunction(Schema schema); > } > public interface HashFunction extends Function { > Integer apply(DataChangeEvent event); > } {code} > add {{getHashFunctionProvider}} method to {{DataSink}} > > {code:java} > public interface DataSink { > /** Get the {@link EventSinkProvider} for writing changed data to > external systems. */ > EventSinkProvider getEventSinkProvider(); > /** Get the {@link MetadataApplier} for applying metadata changes to > external systems. */ > MetadataApplier getMetadataApplier(); > default HashFunctionProvider getHashFunctionProvider() { > return new DefaultHashFunctionProvider(); > } > } {code} > and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. > {code:java} > private HashFunction recreateHashFunction(TableId tableId) { > return > hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
snuyanzin commented on PR #24659: URL: https://github.com/apache/flink/pull/24659#issuecomment-2104255794 Thanks, let's wait for ci btw, @empathy87 could you please create backport PRs with this fix for 1.19 and 1.18? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish [flink]
1996fanrui commented on code in PR #24757: URL: https://github.com/apache/flink/pull/24757#discussion_r1596434203 ## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ## @@ -201,8 +201,11 @@ void announceCombinedWatermark() { // to ready task to avoid period task fail (Java-ThreadPoolExecutor will not schedule // the period task if it throws an exception). for (Integer subtaskId : subTaskIds) { -context.sendEventToSourceOperatorIfTaskReady( -subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark)); +// when subtask have been finished, do not send event. +if (!context.hasNoMoreSplits(subtaskId)) { Review Comment: Does `NoMoreSplits` mean `subtask has been finished`? I'm afraid they are not same. For example, some subtask doesn't have split when the parallelism of kafka source is greater than kafka partition. I'm not sure do we need this change. IIUC, inside of `context.sendEventToSourceOperatorIfTaskReady` has a check : do send event when `gateway` is not null. It means coordinator doesn't send event to subtask when subtask is finished, right? Please correct me if anything is wrong, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35309) Enable Notice file ci check and fix Notice
[ https://issues.apache.org/jira/browse/FLINK-35309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845257#comment-17845257 ] Qingsheng Ren commented on FLINK-35309: --- CDC release-3.1: e452d66b3bfabbab9875d8d5be6f6262749ff30f..62cc62ef722fbe47277704e45166aefaee8d1ac4 > Enable Notice file ci check and fix Notice > --- > > Key: FLINK-35309 > URL: https://issues.apache.org/jira/browse/FLINK-35309 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Zhongqiang Gong >Assignee: Zhongqiang Gong >Priority: Blocker > Labels: pull-request-available > Fix For: cdc-3.1.0 > > > Changes: > * Add ci to check Notice file > * Fix Notice file issue -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
empathy87 commented on code in PR #24659: URL: https://github.com/apache/flink/pull/24659#discussion_r1596500029 ## flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java: ## @@ -86,4 +83,49 @@ public void testApplyPredicate() { OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, predicate6); assertThat(predicate7).hasToString(predicate8.toString()); } + +@Test +@SuppressWarnings("unchecked") +public void testApplyPredicateReverse() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35309][cdc] Fix license and NOTICE files and add CI check in Flink CDC [flink-cdc]
PatrickRen merged PR #3307: URL: https://github.com/apache/flink-cdc/pull/3307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18
[ https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi reassigned FLINK-35109: --- Assignee: Fabian Paul (was: Ufuk Celebi) > Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support > for 1.17 and 1.18 > --- > > Key: FLINK-35109 > URL: https://issues.apache.org/jira/browse/FLINK-35109 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Fabian Paul >Priority: Blocker > Fix For: kafka-4.0.0 > > > The Flink Kafka connector currently can't compile against Flink > 1.20-SNAPSHOT. An example failure can be found at > https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169 > The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, > see FLINK-32455. See also specifically the comment in > https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785 > Next to that, there's also FLINK-25509 which can only be supported with Flink > 1.19 and higher. > So we should: > * Drop support for 1.17 and 1.18 > * Refactor the Flink Kafka connector to use the new > {code:java}MigrationTest{code} > We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; > this change will be a new v4.0 version with support for Flink 1.19 and the > upcoming Flink 1.20 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperato
[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangdingxin updated FLINK-35237: - Summary: Allow Sink to Choose HashFunction in PrePartitionOperato (was: Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink Customization) > Allow Sink to Choose HashFunction in PrePartitionOperato > > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement their PartitionOperator, which undermines the utility of > {{{}PrePartitionOperator{}}}. > To address this limitation, it would be highly desirable to enable the > {{PrePartitionOperator}} to support user-specified custom > {{{}HashFunction{}}}s (Function). A possible > solution could involve a mechanism analogous to the {{DataSink}} interface, > allowing the specification of a {{HashFunctionProvider}} class path in the > configuration file. This enhancement would greatly facilitate users in > tailoring partition strategies to meet their specific application needs. > In this case, I want to create new class {{HashFunctionProvider}} and > {{{}HashFunction{}}}: > {code:java} > public interface HashFunctionProvider { > HashFunction getHashFunction(Schema schema); > } > public interface HashFunction extends Function { > Integer apply(DataChangeEvent event); > } {code} > add {{getHashFunctionProvider}} method to {{DataSink}} > > {code:java} > public interface DataSink { > /** Get the {@link EventSinkProvider} for writing changed data to > external systems. */ > EventSinkProvider getEventSinkProvider(); > /** Get the {@link MetadataApplier} for applying metadata changes to > external systems. */ > MetadataApplier getMetadataApplier(); > default HashFunctionProvider getHashFunctionProvider() { > return new DefaultHashFunctionProvider(); > } > } {code} > and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. > {code:java} > private HashFunction recreateHashFunction(TableId tableId) { > return > hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator
[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangdingxin updated FLINK-35237: - Summary: Allow Sink to Choose HashFunction in PrePartitionOperator (was: Allow Sink to Choose HashFunction in PrePartitionOperato) > Allow Sink to Choose HashFunction in PrePartitionOperator > - > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement their PartitionOperator, which undermines the utility of > {{{}PrePartitionOperator{}}}. > To address this limitation, it would be highly desirable to enable the > {{PrePartitionOperator}} to support user-specified custom > {{{}HashFunction{}}}s (Function). A possible > solution could involve a mechanism analogous to the {{DataSink}} interface, > allowing the specification of a {{HashFunctionProvider}} class path in the > configuration file. This enhancement would greatly facilitate users in > tailoring partition strategies to meet their specific application needs. > In this case, I want to create new class {{HashFunctionProvider}} and > {{{}HashFunction{}}}: > {code:java} > public interface HashFunctionProvider { > HashFunction getHashFunction(Schema schema); > } > public interface HashFunction extends Function { > Integer apply(DataChangeEvent event); > } {code} > add {{getHashFunctionProvider}} method to {{DataSink}} > > {code:java} > public interface DataSink { > /** Get the {@link EventSinkProvider} for writing changed data to > external systems. */ > EventSinkProvider getEventSinkProvider(); > /** Get the {@link MetadataApplier} for applying metadata changes to > external systems. */ > MetadataApplier getMetadataApplier(); > default HashFunctionProvider getHashFunctionProvider() { > return new DefaultHashFunctionProvider(); > } > } {code} > and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. > {code:java} > private HashFunction recreateHashFunction(TableId tableId) { > return > hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35237) Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink Customization
[ https://issues.apache.org/jira/browse/FLINK-35237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangdingxin updated FLINK-35237: - Description: The {{PrePartitionOperator}} in its current implementation only supports a fixed {{HashFunction}} ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). This limits the ability of Sink implementations to customize the partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned tables, it would be advantageous to allow hashing based on partition keys, hashing according to table names, or using the database engine's internal primary key hash functions (such as with MaxCompute DataSink). When users require such custom partitioning logic, they are compelled to implement their PartitionOperator, which undermines the utility of {{{}PrePartitionOperator{}}}. To address this limitation, it would be highly desirable to enable the {{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s (Function). A possible solution could involve a mechanism analogous to the {{DataSink}} interface, allowing the specification of a {{HashFunctionProvider}} class path in the configuration file. This enhancement would greatly facilitate users in tailoring partition strategies to meet their specific application needs. In this case, I want to create new class {{HashFunctionProvider}} and {{{}HashFunction{}}}: {code:java} public interface HashFunctionProvider { HashFunction getHashFunction(Schema schema); } public interface HashFunction extends Function { Integer apply(DataChangeEvent event); } {code} add {{getHashFunctionProvider}} method to {{DataSink}} {code:java} public interface DataSink { /** Get the {@link EventSinkProvider} for writing changed data to external systems. */ EventSinkProvider getEventSinkProvider(); /** Get the {@link MetadataApplier} for applying metadata changes to external systems. */ MetadataApplier getMetadataApplier(); default HashFunctionProvider getHashFunctionProvider() { return new DefaultHashFunctionProvider(); } } {code} and re-implement {{PrePartitionOperator}} {{recreateHashFunction}} method. {code:java} private HashFunction recreateHashFunction(TableId tableId) { return hashFunctionProvider.getHashFunction(loadLatestSchemaFromRegistry(tableId)); } {code} was: The {{PrePartitionOperator}} in its current implementation only supports a fixed {{HashFunction}} ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). This limits the ability of Sink implementations to customize the partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of partitioned tables, it would be advantageous to allow hashing based on partition keys, hashing according to table names, or using the database engine's internal primary key hash functions (such as with MaxCompute DataSink). When users require such custom partitioning logic, they are compelled to implement their PartitionOperator, which undermines the utility of {{{}PrePartitionOperator{}}}. To address this limitation, it would be highly desirable to enable the {{PrePartitionOperator}} to support user-specified custom {{{}HashFunction{}}}s (Function). A possible solution could involve a mechanism analogous to the {{DataSink}} interface, allowing the specification of a {{HashFunctionFactory}} class path in the configuration file. This enhancement would greatly facilitate users in tailoring partition strategies to meet their specific application needs. Summary: Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink Customization (was: Allow Custom HashFunction in PrePartitionOperator for Flink Sink Customization) > Allow Sink to Choose HashFunction in PrePartitionOperator for Flink Sink > Customization > -- > > Key: FLINK-35237 > URL: https://issues.apache.org/jira/browse/FLINK-35237 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > > The {{PrePartitionOperator}} in its current implementation only supports a > fixed {{HashFunction}} > ({{{}org.apache.flink.cdc.runtime.partitioning.PrePartitionOperator.HashFunction{}}}). > This limits the ability of Sink implementations to customize the > partitioning logic for {{{}DataChangeEvent{}}}s. For example, in the case of > partitioned tables, it would be advantageous to allow hashing based on > partition keys, hashing according to table names, or using the database > engine's internal primary key hash functions (such as with MaxCompute > DataSink). > When users require such custom partitioning logic, they are compelled to > implement
[PR] [docs] Rectify names of CDC sources for Flink [flink-cdc]
yuxiqian opened a new pull request, #3310: URL: https://github.com/apache/flink-cdc/pull/3310 Since Flink CDC 3.0 release, pipeline connectors are now first-class connectors. At the same time, all previous connectors designed for Flink are now called "legacy" connectors, which is misleading since they're still under active development and improvement, and there's no plan to drop support for them. This PR rectifies the term from "Legacy CDC Sources" to "CDC Sources for Flink" in CDC documentations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35089][runtime] Initialize lastRecordAttributes in AbstractStreamOperator during setup [flink]
elvirasru commented on PR #24655: URL: https://github.com/apache/flink/pull/24655#issuecomment-2104180848 Hello, Could you tell me when the new version with this fix will be released? Thank you in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35197][table] Support the execution of supsend&resume materialized table in continuous refresh mode [flink]
lsyldliu commented on code in PR #24765: URL: https://github.com/apache/flink/pull/24765#discussion_r1596318366 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import java.util.Map; + +/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */ +@Internal +public class AlterMaterializedTableResumeOperation extends AlterMaterializedTableOperation { + +private final Map options; Review Comment: It would be better to rename it to `dynamicOptions`. ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import java.util.Map; + +/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */ +@Internal +public class AlterMaterializedTableResumeOperation extends AlterMaterializedTableOperation { + +private final Map options; + +public AlterMaterializedTableResumeOperation( +ObjectIdentifier tableIdentifier, Map options) { +super(tableIdentifier); +this.options = options; +} + +public Map getOptions() { +return options; +} + +@Override +public TableResultInternal execute(Context ctx) { +throw new TableException("This method shouldn't be called."); Review Comment: We should throw the following exception: ``` throw new UnsupportedOperationException( "AlterMaterializedTableResumeOperation doesn't support ExecutableOperation yet.") ``` ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode( } } +private static String stopJobWithSavepoint( +OperationExecutor executor, +OperationHandle handle, +ContinuousRefreshHandler refreshHandler) { +ResultFetcher resultFetcher = +executor.callStopJobOperation( +executor.getTableEnvironment(), +handle, +new StopJobOperation(refreshHandler.getJobId(), true, false)); +List results = fetchAllResults(resultFetcher); +return results.get(0).getString(0).toString(); +} + +private static ContinuousRefreshHandler deserializeContinuousHandler( +byte[] serializedRefreshHandler, ClassLoader classLoader) { +try { +return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( +serializedRefreshHandler, class
Re: [PR] [FLINK-34993] parser changes to support model CRUD sql [flink]
flinkbot commented on PR #24769: URL: https://github.com/apache/flink/pull/24769#issuecomment-2104149510 ## CI report: * add5f94934ccdbc9d2ad7e8624b00d27b61fb6a4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34993] parser changes to support model CRUD sql [flink]
lihaosky commented on PR #24769: URL: https://github.com/apache/flink/pull/24769#issuecomment-2104148834 cc @twalthr , @wuchong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35331) Download links for binary releases are displayed as source releases on website
Xintong Song created FLINK-35331: Summary: Download links for binary releases are displayed as source releases on website Key: FLINK-35331 URL: https://issues.apache.org/jira/browse/FLINK-35331 Project: Flink Issue Type: Bug Components: Project Website Reporter: Xintong Song Take Pre-bundled Hadoop as examples. The content for downloading are binary releases, while the link is displayed as "Pre-bundled Hadoop 2.x.y Source Release (asc, sha512)". The problem is caused by misusing `source_release_[url|asc_url|sha512_url]` for binary contents in the corresponding [yaml file.|https://github.com/apache/flink-web/blob/asf-site/docs/data/additional_components.yml] There are many similar cases in the webpage. And a relevant issues is that, some source releases are displayed as "XXX Source Release Source Release", due to including "Source Release" in the `name` field of the corresponding yaml file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34993) Support Model CRUD in parser
[ https://issues.apache.org/jira/browse/FLINK-34993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-34993: --- Labels: pull-request-available (was: ) > Support Model CRUD in parser > > > Key: FLINK-34993 > URL: https://issues.apache.org/jira/browse/FLINK-34993 > Project: Flink > Issue Type: Sub-task >Reporter: Hao Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-34993] parser changes to support model CRUD sql [flink]
lihaosky opened a new pull request, #24769: URL: https://github.com/apache/flink/pull/24769 ## What is the purpose of the change Support model CRUD sql syntax ## Brief change log Add following syntax for model * `create model` * `drop model` * `show model` * `show create model` * `alter model` ## Verifying this change Unit test for parser ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (n) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34324) s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced
[ https://issues.apache.org/jira/browse/FLINK-34324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845238#comment-17845238 ] Matthias Pohl edited comment on FLINK-34324 at 5/10/24 8:07 AM: * master: [93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43] * 1.19: [8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914] * 1.18: [7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18] was (Author: mapohl): * master ** [93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43] * 1.19 ** [8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914] * 1.18 ** [7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18] > s3_setup is called in test_file_sink.sh even if the common_s3.sh is not > sourced > --- > > Key: FLINK-34324 > URL: https://issues.apache.org/jira/browse/FLINK-34324 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility, Tests >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > See example CI run from the FLINK-34150 PR: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56570&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3191 > {code} > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_file_sink.sh: > line 38: s3_setup: command not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34324) s3_setup is called in test_file_sink.sh even if the common_s3.sh is not sourced
[ https://issues.apache.org/jira/browse/FLINK-34324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-34324. --- Fix Version/s: 1.18.2 1.20.0 1.19.1 Resolution: Fixed * master ** [93526c2f3247598ce80854cf65dd4440eb5aaa43|https://github.com/apache/flink/commit/93526c2f3247598ce80854cf65dd4440eb5aaa43] * 1.19 ** [8707c63ee147085671a9ae1b294854bac03fc914|https://github.com/apache/flink/commit/8707c63ee147085671a9ae1b294854bac03fc914] * 1.18 ** [7d98ab060be82fe3684d15501b9eb83373303d18|https://github.com/apache/flink/commit/7d98ab060be82fe3684d15501b9eb83373303d18] > s3_setup is called in test_file_sink.sh even if the common_s3.sh is not > sourced > --- > > Key: FLINK-34324 > URL: https://issues.apache.org/jira/browse/FLINK-34324 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility, Tests >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Matthias Pohl >Assignee: Matthias Pohl >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > See example CI run from the FLINK-34150 PR: > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56570&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=3191 > {code} > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/test_file_sink.sh: > line 38: s3_setup: command not found > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]
XComp merged PR #24604: URL: https://github.com/apache/flink/pull/24604 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]
XComp merged PR #24605: URL: https://github.com/apache/flink/pull/24605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34324][ci] Replaces AWS-based S3 e2e tests with Minio-backed version [flink]
XComp merged PR #24465: URL: https://github.com/apache/flink/pull/24465 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
snuyanzin commented on PR #24659: URL: https://github.com/apache/flink/pull/24659#issuecomment-2104083386 Thanks for the contribution @empathy87 thanks for the review @jeyhunkarimov It looks ok to me I left a minor comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector
[ https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842254#comment-17842254 ] Flaviu Cicio edited comment on FLINK-33989 at 5/10/24 7:30 AM: --- Thank you all for your involvement! >From my understanding, FLINK-9528 only refers to the internal Flink changelog. In our case, the issue is when writing to Kafka using the upsert-kafka sink. As the connector's name suggests, the sink should produce an upsert stream. That is, for each update, there should be an update-after record. Currently, the sink produces a retraction-like stream. That is, for each update it generates a retraction (tombstone generated from the update-before record) and an update-after. Based on [FLIP-149|https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector] and the [mail thread|https://www.mail-archive.com/dev@flink.apache.org/msg42543.html], which stands behind this change, I didn't find any statement implying this behavior. The only mention I found was in this GitHub [comment|https://github.com/apache/flink/pull/13850#discussion_r515759845]. Thus, I propose a fix that skips the update-before records when writing to Kafka using the kafka-upsert connector. This can be achieved by rewriting this [code block|https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L101] to: {code:java} if (kind == RowKind.DELETE) { valueSerialized = null; else if (kind == RowKind.UPDATE_BEFORE) { return null; } {code} was (Author: JIRAUSER302297): Thank you all for your involvement! >From my understanding, FLINK-9528 only refers to the internal Flink changelog. In our case, the issue seems to be when writing to Kafka using the upsert-kafka sink. As the connector's name suggests, the sink should produce an upsert stream. That is, for each update, there should be an update-after record. Currently, the sink produces a retraction-like stream. That is, for each update it generates a retraction (tombstone generated from the update-before record) and an update-after. Based on [FLIP-149|https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector] and the [mail thread|https://www.mail-archive.com/dev@flink.apache.org/msg42543.html], which stands behind this change, I didn't find any statement implying this behavior. The only mention I found was in this GitHub [comment|https://github.com/apache/flink/pull/13850#discussion_r515759845]. Thus, I propose a fix that skips the update-before records when writing to Kafka using the kafka-upsert connector. This can be achieved by rewriting this [code block|https://github.com/apache/flink-connector-kafka/blob/369e7be46a70fd50d68746498aed82105741e7d6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L101] to: {code:java} if (kind == RowKind.DELETE) { valueSerialized = null; else if (kind == RowKind.UPDATE_BEFORE) { return null; } {code} > Insert Statement With Filter Operation Generates Extra Tombstone using Upsert > Kafka Connector > - > > Key: FLINK-33989 > URL: https://issues.apache.org/jira/browse/FLINK-33989 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Runtime >Affects Versions: 1.17.2 >Reporter: Flaviu Cicio >Priority: Major > > Given the following Flink SQL tables: > {code:sql} > CREATE TABLE input ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'input', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); > CREATE TABLE output ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'output', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); {code} > And, the following entries are present in the input Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > If we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input; {code} > The following entries are published to the output Kafka topic: > {code:json} >
Re: [PR] [FLINK-35098][ORC] Fix incorrect results with literal first expressions [flink]
snuyanzin commented on code in PR #24659: URL: https://github.com/apache/flink/pull/24659#discussion_r1596379442 ## flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFileSystemFilterTest.java: ## @@ -86,4 +83,49 @@ public void testApplyPredicate() { OrcFilters.Predicate predicate8 = new OrcFilters.And(predicate4, predicate6); assertThat(predicate7).hasToString(predicate8.toString()); } + +@Test +@SuppressWarnings("unchecked") +public void testApplyPredicateReverse() { Review Comment: ```suggestion void testApplyPredicateReverse() { ``` JUnit5 doesn't need public modifiers, so it's better harden them could you please also do same for the test method above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] chore: fix typo [flink-kubernetes-operator]
1996fanrui commented on PR #812: URL: https://github.com/apache/flink-kubernetes-operator/pull/812#issuecomment-2104048726 This PR has been opened for 1 month, and don't response more than 2 weeks. I close it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] chore: fix typo [flink-kubernetes-operator]
1996fanrui closed pull request #812: chore: fix typo URL: https://github.com/apache/flink-kubernetes-operator/pull/812 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35328] AutoScale supports setting the maximum floating parallelism by the number of Pulsar partitions [flink-kubernetes-operator]
wenbingshen commented on code in PR #827: URL: https://github.com/apache/flink-kubernetes-operator/pull/827#discussion_r1596355996 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ## @@ -247,17 +249,36 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } -private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) -throws Exception { +private void updateKafkaPulsarSourceMaxParallelisms( +Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { -var partitionRegex = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$"); +Pattern partitionRegex = +Pattern.compile( + "^.*\\.KafkaSourceReader\\.topic\\.(?.+)\\.partition\\.(?\\d+)\\.currentOffset$" ++ "|^.*\\.PulsarConsumer\\.(?.+)-partition-(?\\d+)\\..*\\.numMsgsReceived$"); for (var vertexInfo : topology.getVertexInfos().values()) { if (vertexInfo.getInputs().isEmpty()) { var sourceVertex = vertexInfo.getId(); var numPartitions = queryAggregatedMetricNames(restClient, jobId, sourceVertex).stream() -.filter(partitionRegex.asMatchPredicate()) -.count(); +.map( Review Comment: Yes, according to my search, Pulsar Source does not have metrics at partition level, all partition level metrics are associated with consumerId. Thanks for review. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org