[jira] [Created] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator
Yuxin Tan created FLINK-35169: - Summary: Recycle buffers to freeSegments before releasing data buffer for sort accumulator Key: FLINK-35169 URL: https://issues.apache.org/jira/browse/FLINK-35169 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.20.0 Reporter: Yuxin Tan Assignee: Yuxin Tan When using sortBufferAccumulator, we should recycle the buffers to freeSegments before releasing the data buffer. The reason is that when getting buffers from the DataBuffer, it may require more buffers than the current quantity available in freeSegments. Consequently, to ensure adequate buffers from DataBuffer, the flushed and recycled buffers should also be added to freeSegments for reuse. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34996][Connectors/Kafka] Use UserCodeCL to instantiate Deserializer [flink-connector-kafka]
hugogu commented on PR #89: URL: https://github.com/apache/flink-connector-kafka/pull/89#issuecomment-2065806444 Hi @uce and @rmetzger, do you mind reviewing the change and let me know if any change is required before merging. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35154] Javadoc generating fix [flink]
flinkbot commented on PR #24684: URL: https://github.com/apache/flink/pull/24684#issuecomment-2065793603 ## CI report: * b2a3ede0b60213c2dc38e5ff050c5f827e1e86bc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35154) Javadoc aggregate fails
[ https://issues.apache.org/jira/browse/FLINK-35154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35154: --- Labels: pull-request-available (was: ) > Javadoc aggregate fails > --- > > Key: FLINK-35154 > URL: https://issues.apache.org/jira/browse/FLINK-35154 > Project: Flink > Issue Type: Bug >Reporter: Dmitriy Linevich >Priority: Minor > Labels: pull-request-available > > Javadoc plugin fails with error. Using > {code:java} > javadoc:aggregate{code} > ERROR: > {code:java} > [WARNING] The requested profile "include-hadoop" could not be activated > because it does not exist. > [WARNING] The requested profile "arm" could not be activated because it does > not exist. > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:aggregate (default-cli) > on project flink-parent: An error has occurred in JavaDocs report generation: > [ERROR] Exit code: 1 - > /{flinkProjectDir}/flink-end-to-end-tests/flink-confluent-schema-registry/target/generated-sources/example/avro/EventType.java:8: > error: type GenericEnumSymbol does not take parameters > [ERROR] public enum EventType implements > org.apache.avro.generic.GenericEnumSymbol { > [ERROR] {code} > > For our flink 1.17 ERROR > {code:java} > [ERROR] > /{flinkProjectDir}/flink-connectors/flink-sql-connector-hive-3.1.3/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java:21: > error: cannot find symbol > [ERROR] import static > org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; > [ERROR] ^ > [ERROR] symbol: static DEFAULT_DATABASE_NAME > [ERROR] location: class{code} > > Need to increase version of javadoc plugin from 2.9.1 to 2.10.4 (In current > version exists bug same with > [this|https://github.com/checkstyle/checkstyle/issues/291]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35154] Javadoc generating fix [flink]
ldadima opened a new pull request, #24684: URL: https://github.com/apache/flink/pull/24684 ## What is the purpose of the change To fix [FLINK-35154](https://issues.apache.org/jira/browse/FLINK-35154) ## Verifying this change Run mvn javadoc:aggregate -Prelease ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]
RocMarshal commented on code in PR #115: URL: https://github.com/apache/flink-connector-jdbc/pull/115#discussion_r1571809638 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.java: ## @@ -90,6 +101,11 @@ public JdbcConnectionOptionsBuilder withDriverName(String driverName) { return this; } +public JdbcConnectionOptionsBuilder withExtendProperties(Properties extendProperties) { Review Comment: @eskabetxe Thanks for your quick-review. The comments sounds good to me ! And I updated it based on your comments. PTAL~ thank you very much : ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854 ] Vadim Vararu edited comment on FLINK-35115 at 4/19/24 4:54 AM: --- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? was (Author: JIRAUSER305101): [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838854#comment-17838854 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, rigth? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115 ] Vadim Vararu deleted comment on FLINK-35115: -- was (Author: JIRAUSER305101): [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838853#comment-17838853 ] Vadim Vararu commented on FLINK-35115: -- [~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right? > Kinesis connector writes wrong Kinesis sequence number at stop with savepoint > - > > Key: FLINK-35115 > URL: https://issues.apache.org/jira/browse/FLINK-35115 > Project: Flink > Issue Type: Bug > Components: Connectors / Kinesis >Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1 > Environment: The issue happens in a *Kinesis -> Flink -> Kafka* > exactly-once setup with: > * Flink versions checked 1.16.3 and 1.18.1 > * Kinesis connector checked 1.16.3 and 4.2.0-1.18 > * checkpointing configured at 1 minute with EXACTLY_ONCE mode: > {code:java} > StreamExecutionEnvironment execEnv = > StreamExecutionEnvironment.getExecutionEnvironment (); > execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig > ().setCheckpointTimeout (9); execEnv.getCheckpointConfig > ().setCheckpointStorage (CHECKPOINTS_PATH); {code} > * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: > {code:java} > Properties sinkConfig = new Properties (); > sinkConfig.put ("transaction.timeout.ms", 48); > KafkaSink sink = KafkaSink.builder () > .setBootstrapServers ("localhost:9092") > .setTransactionalIdPrefix ("test-prefix") > .setDeliverGuarantee (EXACTLY_ONCE) > .setKafkaProducerConfig (sinkConfig) > .setRecordSerializer ( > (KafkaRecordSerializationSchema) (element, context, > timestamp) -> new ProducerRecord<> ( > "test-output-topic", null, element.getBytes ())) > .build (); {code} > * Kinesis consumer defined as: > {code:java} > FlinkKinesisConsumer flinkKinesisConsumer = new > FlinkKinesisConsumer<> ("test-stream", > new AbstractDeserializationSchema<> () { > @Override > public ByteBuffer deserialize (byte[] bytes) { > // Return > return ByteBuffer.wrap (bytes); > } > }, props); {code} > >Reporter: Vadim Vararu >Assignee: Aleksandr Pilipenko >Priority: Blocker > Labels: kinesis, pull-request-available > Fix For: aws-connector-4.3.0 > > > Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a > stop-with-savepoint, Flink duplicates in Kafka all the records between the > last checkpoint and the savepoint at resume: > * Event1 is written to Kinesis > * Event1 is processed by Flink > * Event1 is committed to Kafka at the checkpoint > * > > * Event2 is written to Kinesis > * Event2 is processed by Flink > * Stop with savepoint is triggered manually > * Event2 is committed to Kafka > * > > * Job is resumed from the savepoint > * *{color:#FF}Event2 is written again to Kafka at the first > checkpoint{color}* > > {color:#172b4d}I believe that it's a Kinesis connector issue for 2 > reasons:{color} > * I've checked the actual Kinesis sequence number in the _metadata file > generated at stop-with-savepoint and it's the one from the checkpoint before > the savepoint instead of being the one of the last record committed to Kafka. > * I've tested exactly the save job with Kafka as source instead of Kinesis > as source and the behaviour does not reproduce. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]
Zakelly commented on code in PR #24676: URL: https://github.com/apache/flink/pull/24676#discussion_r1571777947 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -133,6 +136,20 @@ public final ThrowingConsumer, Exception> getRecordProcessor getClass().getName(), inputId)); } +@Override +public final OperatorSnapshotFutures snapshotState( +long checkpointId, +long timestamp, +CheckpointOptions checkpointOptions, +CheckpointStreamFactory factory) +throws Exception { +asyncExecutionController.drainInflightRecords(0); Review Comment: Ah... almost forget. Should we check `isAsyncStateProcessingEnabled` first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32315][k8s] Support uploading "local://" artifacts in Kubernetes Application Mode [flink]
ferenc-csaky commented on PR #24303: URL: https://github.com/apache/flink/pull/24303#issuecomment-2065736703 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]
Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1571771428 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,73 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +/** + * A flag to enable or disable async mode related components when tasks initialize. As long as + * this option is enabled, the state access of Async state APIs will be executed asynchronously. + * Otherwise, the state access of Async state APIs will be executed synchronously. For Sync + * state APIs, the state access is always executed synchronously, enable this option would bring + * some overhead. + * + * Note: This is an experimental feature(FLIP-425) under evaluation. + */ +@Experimental +public static final ConfigOption ASYNC_STATE_ENABLED = +ConfigOptions.key("execution.async-mode.enabled") +.booleanType() +.defaultValue(false) +.withDescription( +"A flag to enable or disable async mode related components when tasks initialize." ++ " As long as this option is enabled, the state access of Async state APIs will be executed asynchronously." ++ " Otherwise, the state access of Async state APIs will be executed synchronously." ++ " For Sync state APIs, the state access is always executed synchronously, enable this option would bring some overhead.\n" ++ " Note: This is an experimental feature under evaluation."); + +/** + * The max limit of in-flight records number in async execution mode, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-mode.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async execution mode, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async execution mode. Async execution mode provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_BUFFER_TIMEOUT} to control + * the frequency of triggering. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_SIZE = +ConfigOptions.key("execution.async-mode.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async execution mode. Async execution mode provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform actively. + */ +@Experimental +public static final ConfigOption ASYNC_BUFFER_TIMEOUT = +ConfigOptions.key("execution.async-state.buffer-timeout") Review Comment: Sry guys, I'm afraid I prefer the original proposal of `async-state` in FLIP, since this is only for the stateful operator and even the problem of `record-order` is caused by state
Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]
Zakelly commented on code in PR #24672: URL: https://github.com/apache/flink/pull/24672#discussion_r1571754617 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java: ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.function.BiConsumerWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * An implementation of {@link InternalTimerService} that is used by {@link + * org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}. + * The timer service will set {@link RecordContext} for the timers before invoking action to + * preserve the execution order between timer firing and records processing. + * + * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425 + * timers section. + * @param Type of timer's key. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public class InternalTimerServiceAsyncImpl extends InternalTimerServiceImpl { + +private AsyncExecutionController asyncExecutionController; + +InternalTimerServiceAsyncImpl( +TaskIOMetricGroup taskIOMetricGroup, +KeyGroupRange localKeyGroupRange, +KeyContext keyContext, +ProcessingTimeService processingTimeService, +KeyGroupedInternalPriorityQueue> processingTimeTimersQueue, +KeyGroupedInternalPriorityQueue> eventTimeTimersQueue, +StreamTaskCancellationContext cancellationContext, +AsyncExecutionController asyncExecutionController) { +super( +taskIOMetricGroup, +localKeyGroupRange, +keyContext, +processingTimeService, +processingTimeTimersQueue, +eventTimeTimersQueue, +cancellationContext); +this.asyncExecutionController = asyncExecutionController; +} + +void onProcessingTime(long time) throws Exception { Review Comment: add `@Override` here? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java: ## @@ -179,6 +180,55 @@ InternalTimerServiceImpl registerOrGetTimerService( return timerService; } +@Override +public InternalTimerService getAsyncInternalTimerService( +String name, +TypeSerializer keySerializer, +TypeSerializer namespaceSerializer, +Triggerable triggerable, +AsyncExecutionController asyncExecutionController) { +checkNotNull(keySerializer, "Timers can only be used on keyed operators."); + +// the following casting is to overcome type restrictions. +TimerSerializer timerSerializer = +new TimerSerializer<>(keySerializer, namespaceSerializer); + +InternalTimerServiceAsyncImpl timerService = +registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController); + +timerService.startTimerService( +timerSerializer.getKeySerializer(), +timerSerializer.getNamespaceSerializer(), +triggerable); + +return timerService; +} + + InternalTimerServiceAsyncImpl registerOrGetAsyncTimerService( +String name, +TimerSerializer timerSerializer, +AsyncExecutionController
[jira] [Updated] (FLINK-35168) Basic State Iterator for async processing
[ https://issues.apache.org/jira/browse/FLINK-35168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-35168: Component/s: Runtime / State Backends > Basic State Iterator for async processing > - > > Key: FLINK-35168 > URL: https://issues.apache.org/jira/browse/FLINK-35168 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35168) Basic State Iterator for async processing
Zakelly Lan created FLINK-35168: --- Summary: Basic State Iterator for async processing Key: FLINK-35168 URL: https://issues.apache.org/jira/browse/FLINK-35168 Project: Flink Issue Type: Sub-task Reporter: Zakelly Lan Assignee: Zakelly Lan -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
Zakelly commented on code in PR #24681: URL: https://github.com/apache/flink/pull/24681#discussion_r1571721125 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; +import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; +import org.apache.flink.runtime.state.v2.InternalValueState; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; + +import org.rocksdb.ColumnFamilyHandle; + +import java.io.IOException; + +/** + * The {@link InternalValueState} implement for ForStDB. + * + * @param The type of the key. + * @param The type of the value. + */ +public class ForStValueState extends InternalValueState +implements ValueState, ForStInnerTable, V> { + +/** The column family which this internal value state belongs to. */ +private final ColumnFamilyHandle columnFamilyHandle; + +/** The serialized key builder which need be thread-safe. */ +private final ThreadLocal> serializedKeyBuilder; + +/** The data outputStream used for value serializer, which need be thread-safe. */ +private final ThreadLocal valueSerializerView; + +/** The data inputStream used for value deserializer, which need be thread-safe. */ +private final ThreadLocal valueDeserializerView; + +public ForStValueState( +StateRequestHandler stateRequestHandler, +ColumnFamilyHandle columnFamily, +ValueStateDescriptor valueStateDescriptor, +ThreadLocal> serializedKeyBuilder, +ThreadLocal valueSerializerView, +ThreadLocal valueDeserializerView) { +super(stateRequestHandler, valueStateDescriptor); +this.columnFamilyHandle = columnFamily; +this.serializedKeyBuilder = serializedKeyBuilder; +this.valueSerializerView = valueSerializerView; +this.valueDeserializerView = valueDeserializerView; +} + +@Override +public ColumnFamilyHandle getColumnFamilyHandle() { +return columnFamilyHandle; +} + +@Override +public byte[] serializeKey(ContextKey contextKey) throws IOException { Review Comment: Should `ContextKey` be thread-safe? It seems the 'read cache or serialize' logic better be called only once for each context key. ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMultiGetOperation.java: ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import java.io.IOException; +import java.util.List; + +/** + * The native multiGet operation implementation for ForStDB. + * + * @param The type of key in get access request. + * @param The type of value in get access request. + */ +public class ForStNativeMultiGetOperation implements ForStDBOperation> { Review Comment: We can introduce this later and may be excluded from this PR? ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ContextKey.java: ## @@ -0,0
Re: [PR] [FLINK-35127] remove HybridSource to avoid CI failure. [flink-cdc]
lvyanquan commented on PR #3237: URL: https://github.com/apache/flink-cdc/pull/3237#issuecomment-2065637354 The failure in current CI contains a error message of ``` (4185ef2c5087300d32f25ea842d3ec59_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause: java.lang.IllegalStateException: The "before" field of UPDATE/DELETE message is null, please check the Postgres table has been set REPLICA IDENTITY to FULL level. You can update the setting by running the command in Postgres 'ALTER TABLE inventory.products REPLICA IDENTITY FULL'. Please see more in Debezium documentation: https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-replica-identity at org.apache.flink.cdc.connectors.postgres.table.PostgresValueValidator.validate(PostgresValueValidator.java:44) ~[classes/:?] ``` Which is unrelated with this pr. And CC @loserwang1024. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35166] Make the SortBufferAccumulator use more buffers when the parallelism is small [flink]
flinkbot commented on PR #24683: URL: https://github.com/apache/flink/pull/24683#issuecomment-2065635252 ## CI report: * 0d0a7bcca23de55518958f32eb36a242bd4b615d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]
flinkbot commented on PR #24682: URL: https://github.com/apache/flink/pull/24682#issuecomment-2065635116 ## CI report: * a5be0a188101bcaf6fb76c8d5144ad002a79a192 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren updated FLINK-35167: -- Fix Version/s: cdc-3.2.0 > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Assignee: zhangdingxin >Priority: Major > Fix For: cdc-3.2.0 > > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838836#comment-17838836 ] zhangdingxin edited comment on FLINK-35167 at 4/19/24 2:22 AM: --- I'd like to do it was (Author: JIRAUSER305142): I'd like to do it [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13575588] > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > Labels: Flink-CDC, connector > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangdingxin updated FLINK-35167: - Labels: (was: Flink-CDC connector) > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Assignee: zhangdingxin >Priority: Major > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-35167: - Assignee: zhangdingxin > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Assignee: zhangdingxin >Priority: Major > Labels: Flink-CDC, connector > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838836#comment-17838836 ] zhangdingxin commented on FLINK-35167: -- I'd like to do it [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13575588] > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > Labels: Flink-CDC, connector > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2065632965 @LadyForest Hi, I have updated as you said, besides, rebased on the latest master branch and CI passed, PTAL, thanks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35167) Introduce MaxCompute pipeline DataSink
[ https://issues.apache.org/jira/browse/FLINK-35167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangdingxin updated FLINK-35167: - Summary: Introduce MaxCompute pipeline DataSink (was: [CDC] Introduce MaxCompute pipeline DataSink) > Introduce MaxCompute pipeline DataSink > -- > > Key: FLINK-35167 > URL: https://issues.apache.org/jira/browse/FLINK-35167 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: zhangdingxin >Priority: Major > Labels: Flink-CDC, connector > Original Estimate: 504h > Remaining Estimate: 504h > > By integrating the MaxCompute DataSink, we enable the precise and efficient > synchronization of data from Flink's Change Data Capture (CDC) into > MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35161) Implement StateExecutor for ForStStateBackend
[ https://issues.apache.org/jira/browse/FLINK-35161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jinzhong Li updated FLINK-35161: Component/s: Runtime / State Backends > Implement StateExecutor for ForStStateBackend > - > > Key: FLINK-35161 > URL: https://issues.apache.org/jira/browse/FLINK-35161 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35163) Utilize ForSt's native MultiGet API to optimize remote state access
[ https://issues.apache.org/jira/browse/FLINK-35163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jinzhong Li updated FLINK-35163: Component/s: Runtime / State Backends > Utilize ForSt's native MultiGet API to optimize remote state access > --- > > Key: FLINK-35163 > URL: https://issues.apache.org/jira/browse/FLINK-35163 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35162) Support grouping state get and put access
[ https://issues.apache.org/jira/browse/FLINK-35162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jinzhong Li updated FLINK-35162: Component/s: Runtime / State Backends > Support grouping state get and put access > - > > Key: FLINK-35162 > URL: https://issues.apache.org/jira/browse/FLINK-35162 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35167) [CDC] Introduce MaxCompute pipeline DataSink
zhangdingxin created FLINK-35167: Summary: [CDC] Introduce MaxCompute pipeline DataSink Key: FLINK-35167 URL: https://issues.apache.org/jira/browse/FLINK-35167 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: zhangdingxin By integrating the MaxCompute DataSink, we enable the precise and efficient synchronization of data from Flink's Change Data Capture (CDC) into MaxCompute. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling
[ https://issues.apache.org/jira/browse/FLINK-35166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35166: --- Labels: pull-request-available (was: ) > Improve the performance of Hybrid Shuffle when enable memory decoupling > --- > > Key: FLINK-35166 > URL: https://issues.apache.org/jira/browse/FLINK-35166 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Jiang Xin >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Currently, the tiered result partition creates the SortBufferAccumulator with > the number of expected buffers as min(numSubpartitions+1, 512), thus the > SortBufferAccumulator may obtain very few buffers when the parallelism is > small. We can easily make the number of expected buffers 512 by default to > have a better performance when the buffers are sufficient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35166] Make the SortBufferAccumulator use more buffers when the parallelism is small [flink]
jiangxin369 opened a new pull request, #24683: URL: https://github.com/apache/flink/pull/24683 ## What is the purpose of the change Improve the performance of hybrid shuffle when enable memory decoupling and meantime the parallelism is small. ## Brief change log - Make the SortBufferAccumulator use more buffers when the parallelism is small ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35047) Introduce ForStStateBackend to manage ForSt
[ https://issues.apache.org/jira/browse/FLINK-35047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35047: --- Labels: pull-request-available (was: ) > Introduce ForStStateBackend to manage ForSt > --- > > Key: FLINK-35047 > URL: https://issues.apache.org/jira/browse/FLINK-35047 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > > A ForStStateBackend is introduced to leverage ForSt as state store for Flink. > This ticket includes: > # Life cycle of ForSt, including initlization/closing > # basic options, resource control, metrics like RocksDBStateBackend > doesn't include the implementation of new AsyncKeyedStateBackend and Async > State API which will be resolved in other tickets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35047][state] Support ForStStateBackend and ForStKeyedStateBackend [flink]
masteryhx opened a new pull request, #24682: URL: https://github.com/apache/flink/pull/24682 ## What is the purpose of the change A ForStStateBackend is introduced to leverage ForSt as state store for Flink. This ticket includes: - Life cycle of ForSt, including initlization/closing - basic options, resource control, metrics like RocksDBStateBackend It doesn't include the implementation of new AsyncKeyedStateBackend and Async State API which will be resolved in other PRs. ## Brief change log - Support ForSt FlinkEnv - Introduce ForStStateBackend and ForStKeyedStateBackend - Support restoring and building ForSt - Introduce ResourceContainer for ForStStateBackend - Introduce Metrics for ForStStateBackend - Introduce ForSt related options ## Verifying this change This change added tests and can be verified as follows: - Added serveral Tests about configs, metrics, memory control, forst load ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35166) Improve the performance of Hybrid Shuffle when enable memory decoupling
Jiang Xin created FLINK-35166: - Summary: Improve the performance of Hybrid Shuffle when enable memory decoupling Key: FLINK-35166 URL: https://issues.apache.org/jira/browse/FLINK-35166 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Jiang Xin Fix For: 1.20.0 Currently, the tiered result partition creates the SortBufferAccumulator with the number of expected buffers as min(numSubpartitions+1, 512), thus the SortBufferAccumulator may obtain very few buffers when the parallelism is small. We can easily make the number of expected buffers 512 by default to have a better performance when the buffers are sufficient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35127] remove HybridSource to avoid CI failure. [flink-cdc]
lvyanquan commented on PR #3237: URL: https://github.com/apache/flink-cdc/pull/3237#issuecomment-2065589494 Looks like HybridSource has a requirement for parallelism, but some of our tests use the core number of the machine itself https://github.com/apache/flink/blob/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java#L55 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2065504494 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Support skip verify ssl certificate and hostname [flink-connector-elasticsearch]
liuml07 commented on PR #96: URL: https://github.com/apache/flink-connector-elasticsearch/pull/96#issuecomment-2065368913 Thank you @tosone . Could you help review that PR? Also I can update that one to port your change about Table API changes. If you would like to co-author that by pushing to that branch, I'd appreciate it. We may need to ping folks with write permission to merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]
afedulov commented on code in PR #24564: URL: https://github.com/apache/flink/pull/24564#discussion_r1571313741 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -184,6 +184,21 @@ public synchronized ComponentMetricStore getJobManagerMetricStore() { return ComponentMetricStore.unmodifiable(jobManager); } +public synchronized ComponentMetricStore getJobManagerOperatorMetricStore( +String jobID, String taskID) { +JobMetricStore job = jobID == null ? null : jobs.get(jobID); +if (job == null || taskID == null) { +return null; +} + +TaskMetricStore task = job.getTaskMetricStore(taskID); +if (task == null) { +return null; +} + +return ComponentMetricStore.unmodifiable(task.getJobManagerOperatorMetricStore()); +} Review Comment: Feels like this could be a bit more readable ```suggestion public synchronized ComponentMetricStore getJobManagerOperatorMetricStore(String jobID, String taskID) { if (jobID == null || taskID == null) { return null; } JobMetricStore job = jobs.get(jobID); if (job == null) { return null; } TaskMetricStore task = job.getTaskMetricStore(taskID); if (task == null) { return null; } return ComponentMetricStore.unmodifiable(task.getJobManagerOperatorMetricStore()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]
pvary commented on PR #3233: URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2065143602 Good catch @loserwang1024! Could you please add a test case to prevent later code changes to revert this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35165) AdaptiveBatch Scheduler should not restrict the default source parallelism to the max parallelism set
Venkata krishnan Sowrirajan created FLINK-35165: --- Summary: AdaptiveBatch Scheduler should not restrict the default source parallelism to the max parallelism set Key: FLINK-35165 URL: https://issues.apache.org/jira/browse/FLINK-35165 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Venkata krishnan Sowrirajan Copy-pasting the reasoning mentioned on this [discussion thread|https://lists.apache.org/thread/o887xhvvmn2rg5tyymw348yl2mqt23o7]. Let me state why I think "{_}jobmanager.adaptive-batch-scheduler.default-source-parallelism{_}" should not be bound by the "{_}jobmanager.adaptive-batch-scheduler.max-parallelism{_}". * Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex * Limiting source parallelism by downstream vertices' max parallelism is incorrect * If we say for ""semantic consistency" the source vertex parallelism has to be bound by the overall job's max parallelism, it can lead to following issues: ** High filter selectivity with huge amounts of data to read ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance. ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
afedulov commented on PR #23777: URL: https://github.com/apache/flink/pull/23777#issuecomment-2064985125 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]
z3d1k commented on code in PR #138: URL: https://github.com/apache/flink-connector-aws/pull/138#discussion_r1571233482 ## flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java: ## @@ -312,6 +316,148 @@ public void testListStateChangedAfterSnapshotState() throws Exception { } } +@Test +public void testSnapshotStateDuringStopWithSavepoint() throws Exception { + +// -- +// setup config, initial state and expected state snapshot +// -- +Properties config = TestUtils.getStandardProperties(); + +ArrayList> initialState = new ArrayList<>(1); +initialState.add( +Tuple2.of( +KinesisDataFetcher.convertToStreamShardMetadata( +new StreamShardHandle( +"fakeStream1", +new Shard() +.withShardId( +KinesisShardIdGenerator + .generateFromShardOrder(0, +new SequenceNumber("11"))); + +ArrayList> expectedStateSnapshot1 = +new ArrayList<>(1); +expectedStateSnapshot1.add( +Tuple2.of( +KinesisDataFetcher.convertToStreamShardMetadata( +new StreamShardHandle( +"fakeStream1", +new Shard() +.withShardId( +KinesisShardIdGenerator + .generateFromShardOrder(0, +new SequenceNumber("12"))); +ArrayList> expectedStateSnapshot2 = +new ArrayList<>(1); +expectedStateSnapshot2.add( +Tuple2.of( +KinesisDataFetcher.convertToStreamShardMetadata( +new StreamShardHandle( +"fakeStream1", +new Shard() +.withShardId( +KinesisShardIdGenerator + .generateFromShardOrder(0, +new SequenceNumber("13"))); + +// -- +// mock operator state backend and initial state for initializeState() +// -- + +TestingListState> listState = +new TestingListState<>(); +for (Tuple2 state : initialState) { +listState.add(state); +} + +OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); Review Comment: @vahmed-hamdy existing test util currently does not support `getUnionListState`. As pointed by @dannycranmer, this connector is on deprecation path. Split test into separate cases and extracted some of the common logic to reduce size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35115][Connectors/Kinesis] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]
z3d1k commented on code in PR #138: URL: https://github.com/apache/flink-connector-aws/pull/138#discussion_r1571230798 ## flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java: ## @@ -149,6 +149,7 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction sequenceNumsToRestore; private volatile boolean running = true; +private volatile boolean closed = false; Review Comment: Added comments for these flags -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]
zentol commented on code in PR #24680: URL: https://github.com/apache/flink/pull/24680#discussion_r1571216654 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java: ## @@ -93,8 +93,12 @@ void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() { ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); context.setExpectWaitingForResources(); +final StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + executionGraphWithVertexParallelismFuture.complete( -getGraph(new StateTrackingMockExecutionGraph())); +getGraph(executionGraph)); + + assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); Review Comment: > the graph would be running before the change yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]
zentol commented on code in PR #24680: URL: https://github.com/apache/flink/pull/24680#discussion_r1571218899 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java: ## @@ -166,18 +170,19 @@ static class MockCreatingExecutionGraphContext extends MockStateWithoutExecution new StateValidator<>("Executing"); private Function< - CreatingExecutionGraph.ExecutionGraphWithVertexParallelism, -CreatingExecutionGraph.AssignmentResult> +CreatingExecutionGraph.ExecutionGraphWithVertexParallelism, +CreatingExecutionGraph.AssignmentResult> tryToAssignSlotsFunction = -e -> CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph()); +e -> CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph()); private GlobalFailureHandler globalFailureHandler = t -> { // No-op. }; public void setExpectWaitingForResources() { -waitingForResourcesStateValidator.expectInput((none) -> {}); +waitingForResourcesStateValidator.expectInput((none) -> { Review Comment: No i screwed up :) My intellij is using a spotless config that isnt fully compatible with Flink... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]
zentol commented on code in PR #24680: URL: https://github.com/apache/flink/pull/24680#discussion_r1571216390 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1783,56 +1778,6 @@ void testStopWithSavepointFailsInIllegalState() throws Exception { .withCauseInstanceOf(CheckpointException.class); } -@Test -void testSavepointFailsWhenBlockingEdgeExists() throws Exception { Review Comment: This was added in FLINK-34371 and is covered by tests in the DefaultScheduler: https://github.com/apache/flink/commit/d4e0084649c019c536ee1e44bab15c8eca01bf13#diff-b4bc1cd606feb86850a18371520b5dd63d02090b8567fdf80730d1d7dd6e693d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment [flink]
dmvk commented on code in PR #24680: URL: https://github.com/apache/flink/pull/24680#discussion_r1571177099 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java: ## @@ -166,18 +170,19 @@ static class MockCreatingExecutionGraphContext extends MockStateWithoutExecution new StateValidator<>("Executing"); private Function< - CreatingExecutionGraph.ExecutionGraphWithVertexParallelism, -CreatingExecutionGraph.AssignmentResult> +CreatingExecutionGraph.ExecutionGraphWithVertexParallelism, +CreatingExecutionGraph.AssignmentResult> tryToAssignSlotsFunction = -e -> CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph()); +e -> CreatingExecutionGraph.AssignmentResult.success(e.getExecutionGraph()); private GlobalFailureHandler globalFailureHandler = t -> { // No-op. }; public void setExpectWaitingForResources() { -waitingForResourcesStateValidator.expectInput((none) -> {}); +waitingForResourcesStateValidator.expectInput((none) -> { Review Comment: uuh, what's happening here 樂 did we update formatter? ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ## @@ -1783,56 +1778,6 @@ void testStopWithSavepointFailsInIllegalState() throws Exception { .withCauseInstanceOf(CheckpointException.class); } -@Test -void testSavepointFailsWhenBlockingEdgeExists() throws Exception { Review Comment: do you know why / when this was introduced? it indeed doesn't seem to be related AS; is this tested somewhere else? ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java: ## @@ -93,8 +93,12 @@ void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() { ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); context.setExpectWaitingForResources(); +final StateTrackingMockExecutionGraph executionGraph = new StateTrackingMockExecutionGraph(); + executionGraphWithVertexParallelismFuture.complete( -getGraph(new StateTrackingMockExecutionGraph())); +getGraph(executionGraph)); + + assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); Review Comment: iiuc, the graph would be running before the change; makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding JDBC Connector v3.2.0 [flink-web]
Samrat002 commented on code in PR #734: URL: https://github.com/apache/flink-web/pull/734#discussion_r1571145570 ## docs/data/flink_connectors.yml: ## @@ -51,11 +51,11 @@ hbase: compatibility: ["1.16.x", "1.17.x"] jdbc: - name: "Apache Flink JDBC Connector 3.1.2" - source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz; - source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.asc; - source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.sha512; - compatibility: ["1.16.x", "1.17.x", "1.18.x"] + name: "Apache Flink JDBC Connector 3.2.0" + source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz; + source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.asc; + source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.sha512; + compatibility: ["1.18.x", "1.19.x"] Review Comment: [NIT] Reduced supported versions in JDBC connector: Previously 3 versions (1.16.x, 1.17.x, 1.18.x), now 2 (1.18.x, 1.19.x). Any specific reason for the change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Adding JDBC Connector v3.2.0 [flink-web]
Samrat002 commented on code in PR #734: URL: https://github.com/apache/flink-web/pull/734#discussion_r1571145570 ## docs/data/flink_connectors.yml: ## @@ -51,11 +51,11 @@ hbase: compatibility: ["1.16.x", "1.17.x"] jdbc: - name: "Apache Flink JDBC Connector 3.1.2" - source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz; - source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.asc; - source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.1.2/flink-connector-jdbc-3.1.2-src.tgz.sha512; - compatibility: ["1.16.x", "1.17.x", "1.18.x"] + name: "Apache Flink JDBC Connector 3.2.0" + source_release_url: "https://www.apache.org/dyn/closer.lua/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz; + source_release_asc_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.asc; + source_release_sha512_url: "https://downloads.apache.org/flink/flink-connector-jdbc-3.2.0/flink-connector-jdbc-3.2.0-src.tgz.sha512; + compatibility: ["1.18.x", "1.19.x"] Review Comment: [NIT] Earlier jdbc connector used to support 3 versions (1.16.x , 1.17.x and 1.18.x) now it supports only 2 ( 1.18.x and 1.19.x) versions only. any specific reason ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING in Executing state [flink]
zentol commented on PR #24680: URL: https://github.com/apache/flink/pull/24680#issuecomment-2064570934 meh, stop-with-savepoint failures need to be able to transition back into executing without recreating the EG :/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2064501044 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]
eskabetxe commented on code in PR #115: URL: https://github.com/apache/flink-connector-jdbc/pull/115#discussion_r1571058904 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.java: ## @@ -90,6 +101,11 @@ public JdbcConnectionOptionsBuilder withDriverName(String driverName) { return this; } +public JdbcConnectionOptionsBuilder withExtendProperties(Properties extendProperties) { Review Comment: won't be better have the method to allow add one property ``` private Properties properties = new Properties(); public JdbcConnectionOptionsBuilder withConfig(String config, String value) { this.properties.put(config, value); return this; } ``` this way we could also start thinking on moving user, pass and driver to this, as they will be used always inside the properties.. simplifying some logic in [SimpleJdbcConnectionProvider.java](https://github.com/apache/flink-connector-jdbc/pull/115/files#diff-1a261021e67f437344c5b6b2b5da274ff6914c51d8b5163ef30f03e6ba3614c5) ``` public JdbcConnectionOptionsBuilder withUsername(String username) { this.properties.put("user", username); return this; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2064332698 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35164) Support `ALTER CATALOG RESET` syntax
Yubin Li created FLINK-35164: Summary: Support `ALTER CATALOG RESET` syntax Key: FLINK-35164 URL: https://issues.apache.org/jira/browse/FLINK-35164 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Yubin Li Attachments: image-2024-04-18-23-26-59-854.png h3. ALTER CATALOG catalog_name RESET (key1, key2, ...) Reset one or more properties to its default value in the specified catalog. !image-2024-04-18-23-26-59-854.png|width=781,height=527! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34916) Support `ALTER CATALOG SET` syntax
[ https://issues.apache.org/jira/browse/FLINK-34916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yubin Li updated FLINK-34916: - Summary: Support `ALTER CATALOG SET` syntax (was: Support `ALTER CATALOG` syntax) > Support `ALTER CATALOG SET` syntax > -- > > Key: FLINK-34916 > URL: https://issues.apache.org/jira/browse/FLINK-34916 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: Yubin Li >Assignee: Yubin Li >Priority: Major > Attachments: image-2024-03-22-18-30-33-182.png > > > Set one or more properties in the specified catalog. If a particular property > is already set in the catalog, override the old value with the new one. > !image-2024-03-22-18-30-33-182.png|width=736,height=583! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-28048][connectors] Introduce Source API alternative to FiniteTestSource [flink]
afedulov commented on PR #23777: URL: https://github.com/apache/flink/pull/23777#issuecomment-2064139948 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35053) TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Description: The JDBC sink for Postgres does not support {{{}TIMESTAMP_LTZ{}}}, nor {{TIMESTAMP WITH TIME ZONE}} types. Related issues: FLINK-22199, FLINK-20869 h2. Problem Explanation A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . {code:sql} -- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE ) {code} In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and our goal is to sink it to {{{}target_table{}}}. {code:sql} -- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... ) {code} According to [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while {{TIMESTAMP_WITH_TIME_ZONE}} is not. However, when the converter is created via [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], it throws an {{UnsupportedOperationException}} since {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] is. {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) at
[jira] [Updated] (FLINK-35053) TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Description: The JDBC sink for Postgres does not support {{{}TIMESTAMP_LTZ{}}}, nor {{TIMESTAMP WITH TIME ZONE}} types. Related issues: FLINK-22199, FLINK-20869 h2. Problem Explanation A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . {code:sql} -- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE ) {code} In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and our goal is to sink it to {{{}target_table{}}}. {code:sql} -- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... ) {code} According to [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while {{TIMESTAMP_WITH_TIME_ZONE}} is not. However, when the converter is created via [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], it throws an {{UnsupportedOperationException}} since {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] is. {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) at
[jira] [Updated] (FLINK-35053) TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Summary: TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres (was: TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres) > TIMESTAMP with LOCAL TIME ZONE not supported by JDBC connector for Postgres > --- > > Key: FLINK-35053 > URL: https://issues.apache.org/jira/browse/FLINK-35053 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC >Affects Versions: 1.19.0, 1.18.1, jdbc-3.1.2 >Reporter: Pietro >Priority: Major > Attachments: Timestamp.png, TimestampData.png, > createExternalConverter.png > > > The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, > nor {{TIMESTAMP_LTZ}} types. > Related issues: FLINK-22199, FLINK-20869 > h2. Problem Explanation > A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . > {code:sql} > -- Postgres DDL > CREATE TABLE target_table ( > tm_tz TIMESTAMP WITH TIME ZONE > ) > {code} > In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and > our goal is to sink it to {{{}target_table{}}}. > {code:sql} > -- Flink DDL > CREATE TABLE sink ( > tm_tz TIMESTAMP_LTZ(6) > ) WITH ( > 'connector' = 'jdbc', > 'table-name' = 'target_table' > ... > ) > {code} > According to > [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while > {{TIMESTAMP_WITH_TIME_ZONE}} is not. > However, when the converter is created via > [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], > it throws an {{UnsupportedOperationException}} since > {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while > [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] > is. > {code:java} > Exception in thread "main" java.lang.UnsupportedOperationException: > Unsupported type:TIMESTAMP_LTZ(6) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) > at > org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) > at > org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) > at > org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) > at > org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) > at >
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
twalthr commented on code in PR #24679: URL: https://github.com/apache/flink/pull/24679#discussion_r1570922395 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.*; +import static org.apache.flink.types.RowKind.*; + +/** Tests for built-in ARRAY_AGG aggregation functions. */ +class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + +@Override +Stream getTestCaseSpecs() { +return Stream.of( +TestSpec.forExpression("SINGLE_VALUE") +.withSource( +ROW(STRING(), INT()), +Arrays.asList( +Row.ofKind(INSERT, "A", 1), +Row.ofKind(INSERT, "A", 2), +Row.ofKind(INSERT, "B", 2))) +.testSqlRuntimeError( +source -> +"SELECT f0, SINGLE_VALUE(f1) FROM " ++ source ++ " GROUP BY f0", +ROW(STRING(), INT()), +ex -> + ex.hasRootCauseInstanceOf(TableRuntimeException.class) Review Comment: Yes, let's strive for this. It should be our end goal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
dawidwys commented on PR #24679: URL: https://github.com/apache/flink/pull/24679#issuecomment-2063994171 > Let's already fix some other locations with this exception: for example: AbstractCodeGeneratorCastRule and CastExecutor SqlJsonUtils Sure, any other places that you're aware of? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
flinkbot commented on PR #24681: URL: https://github.com/apache/flink/pull/24681#issuecomment-2063985516 ## CI report: * b1abbbc14798703d82a9115d42141aa0bb62abbc UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
ljz2051 commented on PR #24681: URL: https://github.com/apache/flink/pull/24681#issuecomment-2063978444 @Zakelly @fredia @masteryhx Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35162) Support grouping state get and put access
[ https://issues.apache.org/jira/browse/FLINK-35162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35162: --- Labels: pull-request-available (was: ) > Support grouping state get and put access > - > > Key: FLINK-35162 > URL: https://issues.apache.org/jira/browse/FLINK-35162 > Project: Flink > Issue Type: Sub-task >Reporter: Jinzhong Li >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]
ljz2051 opened a new pull request, #24681: URL: https://github.com/apache/flink/pull/24681 ## What is the purpose of the change This pull request introduces the ForStValueState and support WriteBatchOperation and general MultiGetOperation for ForSt. ## Brief change log - Introduces the ForStValueState - Support WriteBatchOperation and general MultiGetOperation for ForSt ## Verifying this change This change added tests and can be verified by ForStWriteBatchOperationTest and ForStGeneralMultiGetOperationTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
dawidwys commented on code in PR #24679: URL: https://github.com/apache/flink/pull/24679#discussion_r1570850917 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java: ## @@ -344,6 +361,38 @@ public void execute(TableEnvironment tEnv, Table sourceTable) { protected abstract TableResult getResult(TableEnvironment tEnv, Table sourceTable); } +private abstract static class RuntimeFailureItem implements TestItem { Review Comment: I followed the `SuccessItem` convention. `Success/Failure`, but sure I'll rename it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
dawidwys commented on code in PR #24679: URL: https://github.com/apache/flink/pull/24679#discussion_r1570846758 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.*; +import static org.apache.flink.types.RowKind.*; + +/** Tests for built-in ARRAY_AGG aggregation functions. */ +class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + +@Override +Stream getTestCaseSpecs() { +return Stream.of( +TestSpec.forExpression("SINGLE_VALUE") +.withSource( +ROW(STRING(), INT()), +Arrays.asList( +Row.ofKind(INSERT, "A", 1), +Row.ofKind(INSERT, "A", 2), +Row.ofKind(INSERT, "B", 2))) +.testSqlRuntimeError( +source -> +"SELECT f0, SINGLE_VALUE(f1) FROM " ++ source ++ " GROUP BY f0", +ROW(STRING(), INT()), +ex -> + ex.hasRootCauseInstanceOf(TableRuntimeException.class) Review Comment: Are you confident that's the only exception we will ever want to test against? If that's the case, sure I can hardcode it for that exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35125][state] Implement ValueState for ForStStateBackend [flink]
ljz2051 closed pull request #24671: [FLINK-35125][state] Implement ValueState for ForStStateBackend URL: https://github.com/apache/flink/pull/24671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35163) Utilize ForSt's native MultiGet API to optimize remote state access
Jinzhong Li created FLINK-35163: --- Summary: Utilize ForSt's native MultiGet API to optimize remote state access Key: FLINK-35163 URL: https://issues.apache.org/jira/browse/FLINK-35163 Project: Flink Issue Type: Sub-task Reporter: Jinzhong Li Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35162) Support grouping state get and put access
Jinzhong Li created FLINK-35162: --- Summary: Support grouping state get and put access Key: FLINK-35162 URL: https://issues.apache.org/jira/browse/FLINK-35162 Project: Flink Issue Type: Sub-task Reporter: Jinzhong Li -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35161) Implement StateExecutor for ForStStateBackend
Jinzhong Li created FLINK-35161: --- Summary: Implement StateExecutor for ForStStateBackend Key: FLINK-35161 URL: https://issues.apache.org/jira/browse/FLINK-35161 Project: Flink Issue Type: Sub-task Reporter: Jinzhong Li Fix For: 2.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]
liyubin117 commented on PR #24630: URL: https://github.com/apache/flink/pull/24630#issuecomment-2063913126 @LadyForest Hi, I have updated as you said and CI passed, PTAL, thanks :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks
elon_X created FLINK-35160: -- Summary: Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks Key: FLINK-35160 URL: https://issues.apache.org/jira/browse/FLINK-35160 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.18.1, 1.19.0, 1.17.1, 1.16.0 Reporter: elon_X Attachments: image-2024-04-18-20-57-52-440.png, image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-00-04-532.png, image-2024-04-18-21-01-22-881.png, image-2024-04-18-21-34-41-014.png After receiving feedback from the business side about performance issues in their tasks, we attempted to troubleshoot and discovered that their tasks had issues with thread deadlocks. However, the Thread Dump entry on the Flink page only shows thread stacks. Since the users are not very familiar with Java stacks, they couldn't clearly identify that the deadlocks were due to issues in the business logic code and mistakenly thought they were problems with the Flink framework !image-2024-04-18-20-57-52-440.png! !image-2024-04-18-20-58-09-872.png! the JVM's jstack command can clearly display thread deadlocks, unfortunately, the business team does not have the permissions to log into the machines. hear is the jstack log !image-2024-04-18-21-00-04-532.png! FlameGraph are excellent for visualizing performance bottlenecks and hotspots in application profiling but are not designed to pinpoint the exact lines of code where thread deadlocks occur. !image-2024-04-18-21-01-22-881.png! Perhaps we could enhance the Thread Dump feature to display thread deadlocks, similar to what the {{jstack}} command provides. !image-2024-04-18-21-34-41-014.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Support skip verify ssl certificate and hostname [flink-connector-elasticsearch]
tosone commented on PR #96: URL: https://github.com/apache/flink-connector-elasticsearch/pull/96#issuecomment-2063868124 It seems that the commit has already done the same thing. I hope that the commit can be merged as soon as possible. @liuml07 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35159] Transition ExecutionGraph to RUNNING in Executing state [flink]
flinkbot commented on PR #24680: URL: https://github.com/apache/flink/pull/24680#issuecomment-2063860095 ## CI report: * bac810fd425ebf07f82c84a41611e8b873edb9e4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35159) CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash
[ https://issues.apache.org/jira/browse/FLINK-35159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35159: --- Labels: pull-request-available (was: ) > CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash > > > Key: FLINK-35159 > URL: https://issues.apache.org/jira/browse/FLINK-35159 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > When a task manager dies while the JM is generating an ExecutionGraph in the > background then {{CreatingExecutionGraph#handleExecutionGraphCreation}} can > transition back into WaitingForResources if the TM hosted one of the slots > that we planned to use in {{tryToAssignSlots}}. > At this point the ExecutionGraph was already transitioned to running, which > implicitly kicks of periodic checkpointing by the CheckpointCoordinator, > without the operator coordinator holders being initialized yet (as this > happens after we assigned slots). > This effectively leaks that CheckpointCoordinator, including the timer thread > that will continue to try triggering checkpoints, which will naturally fail > to trigger. > This can cause a JM crash because it results in > {{OperatorCoordinatorHolder#abortCurrentTriggering}} to be called, which > fails with an NPE since the {{mainThreadExecutor}} was not initialized yet. > {code} > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: java.lang.NullPointerException > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:707) > at > java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) > at > java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910) > at > java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.util.concurrent.CompletionException: > java.lang.NullPointerException > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932) > at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > ... 7 more > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.abortCurrentTriggering(OperatorCoordinatorHolder.java:388) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:985) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:961) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:693) > at > java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > ... 8 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
twalthr commented on code in PR #24679: URL: https://github.com/apache/flink/pull/24679#discussion_r1570701741 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java: ## @@ -344,6 +361,38 @@ public void execute(TableEnvironment tEnv, Table sourceTable) { protected abstract TableResult getResult(TableEnvironment tEnv, Table sourceTable); } +private abstract static class RuntimeFailureItem implements TestItem { Review Comment: call it `RuntimeErrorItem` to be in sync with subclasses ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MiscAggFunctionITCase.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.*; +import static org.apache.flink.types.RowKind.*; + +/** Tests for built-in ARRAY_AGG aggregation functions. */ +class MiscAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + +@Override +Stream getTestCaseSpecs() { +return Stream.of( +TestSpec.forExpression("SINGLE_VALUE") +.withSource( +ROW(STRING(), INT()), +Arrays.asList( +Row.ofKind(INSERT, "A", 1), +Row.ofKind(INSERT, "A", 2), +Row.ofKind(INSERT, "B", 2))) +.testSqlRuntimeError( +source -> +"SELECT f0, SINGLE_VALUE(f1) FROM " ++ source ++ " GROUP BY f0", +ROW(STRING(), INT()), +ex -> + ex.hasRootCauseInstanceOf(TableRuntimeException.class) Review Comment: can we simplify the test base and test always against `TableRuntimeException`? so the method can only take the expected message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-35045) Introduce ForStFileSystem to support reading and writing with ByteBuffer
[ https://issues.apache.org/jira/browse/FLINK-35045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu resolved FLINK-35045. -- Fix Version/s: 1.20.0 Resolution: Fixed merged 369bbb8f...a312a3bd into master > Introduce ForStFileSystem to support reading and writing with ByteBuffer > > > Key: FLINK-35045 > URL: https://issues.apache.org/jira/browse/FLINK-35045 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Hangxiang Yu >Assignee: Hangxiang Yu >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > As described in FLIP-427, ForStFileSystem is introduced to support reading > and writing with ByteBuffer, which will bridge between ForSt FileSystem and > Flink FileSystem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35159) CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash
Chesnay Schepler created FLINK-35159: Summary: CreatingExecutionGraph can leak CheckpointCoordinator and cause JM crash Key: FLINK-35159 URL: https://issues.apache.org/jira/browse/FLINK-35159 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.2, 1.20.0, 1.19.1 When a task manager dies while the JM is generating an ExecutionGraph in the background then {{CreatingExecutionGraph#handleExecutionGraphCreation}} can transition back into WaitingForResources if the TM hosted one of the slots that we planned to use in {{tryToAssignSlots}}. At this point the ExecutionGraph was already transitioned to running, which implicitly kicks of periodic checkpointing by the CheckpointCoordinator, without the operator coordinator holders being initialized yet (as this happens after we assigned slots). This effectively leaks that CheckpointCoordinator, including the timer thread that will continue to try triggering checkpoints, which will naturally fail to trigger. This can cause a JM crash because it results in {{OperatorCoordinatorHolder#abortCurrentTriggering}} to be called, which fails with an NPE since the {{mainThreadExecutor}} was not initialized yet. {code} java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.NullPointerException at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$8(CheckpointCoordinator.java:707) at java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) at java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:910) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:932) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ... 7 more Caused by: java.lang.NullPointerException at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.abortCurrentTriggering(OperatorCoordinatorHolder.java:388) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:985) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:961) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:693) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ... 8 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]
masteryhx closed pull request #24632: [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer URL: https://github.com/apache/flink/pull/24632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35155] Introduce TableRuntimeException [flink]
flinkbot commented on PR #24679: URL: https://github.com/apache/flink/pull/24679#issuecomment-2063755509 ## CI report: * c7d65d5679cbe4bfc52db6a2a718b8a5d2515ea3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]
flinkbot commented on PR #24678: URL: https://github.com/apache/flink/pull/24678#issuecomment-2063745225 ## CI report: * 503f01052e16a806da7589a8dc0ed645ff00d282 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35007) Update Flink Kafka connector to support 1.19
[ https://issues.apache.org/jira/browse/FLINK-35007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838630#comment-17838630 ] yazgoo commented on FLINK-35007: Hi, Do you plan on publishing flink-connector-kafka:3.1.0-1.19 ? Thanks ! > Update Flink Kafka connector to support 1.19 > > > Key: FLINK-35007 > URL: https://issues.apache.org/jira/browse/FLINK-35007 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / Kafka >Reporter: Martijn Visser >Assignee: Martijn Visser >Priority: Major > Labels: pull-request-available > Fix For: kafka-4.0.0, kafka-3.1.1 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35155) Introduce TableRuntimeException
[ https://issues.apache.org/jira/browse/FLINK-35155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35155: --- Labels: pull-request-available (was: ) > Introduce TableRuntimeException > --- > > Key: FLINK-35155 > URL: https://issues.apache.org/jira/browse/FLINK-35155 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The `throwException` internal function throws a {{RuntimeException}}. It > would be nice to have a specific kind of exception thrown from there, so that > it's easier to classify those. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35155] Introduce TableRuntimeException [flink]
dawidwys opened a new pull request, #24679: URL: https://github.com/apache/flink/pull/24679 ## What is the purpose of the change Introduce `TableRuntimeException` to better classify intentional exceptions thrown from SQL runtime. ## Verifying this change Added tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35156) Wire new operators for async state with DataStream V2
[ https://issues.apache.org/jira/browse/FLINK-35156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35156: --- Labels: pull-request-available (was: ) > Wire new operators for async state with DataStream V2 > - > > Key: FLINK-35156 > URL: https://issues.apache.org/jira/browse/FLINK-35156 > Project: Flink > Issue Type: Sub-task >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35156][Runtime] Make operators of DataStream V2 integrate with async state processing framework [flink]
Zakelly opened a new pull request, #24678: URL: https://github.com/apache/flink/pull/24678 ## What is the purpose of the change This is a simple PR that wire the new introduced operators of DataStream V2 with the `AbstractAsyncStateStreamOperator`. ## Brief change log - Introduce `AbstractAsyncStateUdfStreamOperator` that is nearly identical with `AbstractUdfStreamOperator`, but extends from `AbstractAsyncStateStreamOperator` - Replace base class of `ProcessOperator` (v2) from `AbstractUdfStreamOperator` to `AbstractAsyncStateUdfStreamOperator`. ## Verifying this change This change is a trivial rework without any test coverage. More will be added when the whole state processing works. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35158) Error handling in StateFuture's callback
Yanfei Lei created FLINK-35158: -- Summary: Error handling in StateFuture's callback Key: FLINK-35158 URL: https://issues.apache.org/jira/browse/FLINK-35158 Project: Flink Issue Type: Sub-task Reporter: Yanfei Lei -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-34953][ci] Add github ci for flink-web to auto commit build files [flink-web]
GOODBOY008 commented on code in PR #732: URL: https://github.com/apache/flink-web/pull/732#discussion_r1568758762 ## .github/workflows/docs.yml: ## @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: "Flink Web CI" +on: + pull_request: +branches: + - asf-site + push: +branches: + - asf-site + workflow_dispatch: + +jobs: + build-documentation: +if: github.repository == 'apache/flink-web' +runs-on: ubuntu-latest +permissions: + # Give the default GITHUB_TOKEN write permission to commit and push the changed files back to the repository. + contents: write +steps: +- name: Checkout repository + uses: actions/checkout@v4 + with: +submodules: true +fetch-depth: 0 + +- name: Setup Hugo + uses: peaceiris/actions-hugo@v3 + with: +hugo-version: '0.119.0' +extended: true + +- name: Build website + run: | +# Remove old content folder and create new one +rm -r -f content && mkdir content + +# Build the website +hugo --source docs --destination target + +# Move newly generated static HTML to the content serving folder +mv docs/target/* content + +# Copy quickstarts, rewrite rules and Google Search Console identifier +cp -r _include/. content + +# Get the current commit author +echo "author=$(git log -1 --pretty=\"%an <%ae>\")" >> $GITHUB_OUTPUT + +- name: Commit and push website build + if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} Review Comment: @MartijnVisser With `push` event for pr merge into branch and `workflow_dispatch` for manual trigger to rebuild website. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop [flink-cdc]
yuxiqian commented on PR #3235: URL: https://github.com/apache/flink-cdc/pull/3235#issuecomment-2063673721 Fixed in #3237. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop [flink-cdc]
yuxiqian closed pull request #3235: [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop URL: https://github.com/apache/flink-cdc/pull/3235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
Gyula Fora created FLINK-35157: -- Summary: Sources with watermark alignment get stuck once some subtasks finish Key: FLINK-35157 URL: https://issues.apache.org/jira/browse/FLINK-35157 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1, 1.19.0, 1.17.2 Reporter: Gyula Fora The current watermark alignment logic can easily get stuck if some subtasks finish while others are still running. The reason is that once a source subtask finishes, the subtask is not excluded from alignment, effectively blocking the rest of the job to make progress beyond last wm + alignment time for the finished sources. This can be easily reproduced by the following simple pipeline: {noformat} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStream s = env.fromSource(new NumberSequenceSource(0, 100), WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), Duration.ofSeconds(2)), "Sequence Source").filter((FilterFunction) aLong -> { Thread.sleep(200); return true; } ); s.print(); env.execute();{noformat} The solution could be to send out a max watermark event once the sources finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35053) TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Description: The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, nor {{TIMESTAMP_LTZ}} types. Related issues: FLINK-22199, FLINK-20869 h2. Problem Explanation A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . {code:sql} -- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE ) {code} In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and our goal is to sink it to {{{}target_table{}}}. {code:sql} -- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... ) {code} According to [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while {{TIMESTAMP_WITH_TIME_ZONE}} is not. However, when the converter is created via [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], it throws an {{UnsupportedOperationException}} since {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] is. {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) at
[jira] [Updated] (FLINK-35053) TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Description: The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, nor {{TIMESTAMP_LTZ}} types. Related issues: FLINK-22199, FLINK-20869 h2. Problem Explanation A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . {code:sql} -- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE ) {code} In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and our goal is to sink it to {{{}target_table{}}}. {code:sql} -- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... ) {code} According to [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while {{TIMESTAMP_WITH_TIME_ZONE}} is not. However, when the converter is created via [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], it throws an {{UnsupportedOperationException}} since {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] is. {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) at
Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]
Jiabao-Sun commented on PR #3229: URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2063604709 > > Hi @ZmmBigdata, you don't need submit a new PR. Just rebase the master branch into yours. > > @Jiabao-Sun Ok, thank you for your reply. Will this PR still be merged?Not very familiar with this process. Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]
ZmmBigdata commented on PR #3229: URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2063603076 > Hi @ZmmBigdata, you don't need submit a new PR. Just rebase the master branch into yours. @Jiabao-Sun Ok, thank you for your reply. Will this PR still be merged?Not very familiar with this process. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35053) TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres
[ https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pietro updated FLINK-35053: --- Description: The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}}, nor {{TIMESTAMP_LTZ}} types. Related issues: FLINK-22199, FLINK-20869 h2. Problem Explanation A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} . {code:sql} -- Postgres DDL CREATE TABLE target_table ( tm_tz TIMESTAMP WITH TIME ZONE ) {code} In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and our goal is to sink it to {{{}target_table{}}}. {code:sql} -- Flink DDL CREATE TABLE sink ( tm_tz TIMESTAMP_LTZ(6) ) WITH ( 'connector' = 'jdbc', 'table-name' = 'target_table' ... ) {code} According to [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109], {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while {{TIMESTAMP_WITH_TIME_ZONE}} is not. However, when the converter is created via [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246], it throws an {{UnsupportedOperationException}} since {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168] is. {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported type:TIMESTAMP_LTZ(6) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118) at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.(AbstractJdbcRowConverter.java:68) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.(PostgresRowConverter.java:47) at org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51) at org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205) at org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69) at org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) at
Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]
Jiabao-Sun commented on PR #3229: URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2063583687 Hi @ZmmBigdata, you don't need submit a new PR. Just rebase the master branch into yours. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org