[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest custom pluggable query operation validator support in flink sql gateway. As an sql gateway operator, I feel there is a need for operation validation to ensure only safe operations are executed and unsafe operations are dropped. To address this need, I propose adding a {{OperationValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query operation validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.core.plugin.Plugin; import org.apache.flink.table.operations.Operation; import java.util.Optional; public interface OperationValidator extends Plugin { /** * Validate a given operation and return an optional error string. * * @param op the operation to be validated. * @return Optional error string, should be present only if validation resulted in an error. */ Optional validate(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.OperationValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTableValidator implements OperationValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public Optional validate(Operation op) { if (op instanceof CreateTableOperation) { final CreateTableOperation createTableOp = (CreateTableOperation) op; final String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { try { final long startupTimestampValue = Long.parseLong( createTableOp .getCatalogTable() .getOptions() .get("scan.startup.timestamp-millis")); if (startupTimestampValue < System.currentTimeMillis() - ONE_DAY) { return Optional.of( String.format( "Validation failed in %s: 'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within one day from the current time.", KafkaTableValidator.class.getName(), startupTimestampValue)); } } catch (NumberFormatException e) { return Optional.empty(); } } } return Optional.empty(); } }{code} I'm looking forward to get feedback from the community. Thanks was: h3. Summary Hello I'd like to suggest custom pluggable query operation validator support in flink sql gateway. As an sql gateway operator, I feel there is a need for operation validation to ensure only safe operations are executed and unsafe operations are dropped. To address this need, I propose adding a {{OperationValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query operation validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; public interface OperationValidator extends Plugin { /** * Validate a given operation and return an optional error string. * * @param op the operation to be validated. * @return Optional error string, should be present only if validation resulted in an error. */ Optional validate(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.OperationValidator;
[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest custom pluggable query operation validator support in flink sql gateway. As an sql gateway operator, I feel there is a need for operation validation to ensure only safe operations are executed and unsafe operations are dropped. To address this need, I propose adding a {{OperationValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query operation validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; public interface OperationValidator extends Plugin { /** * Validate a given operation and return an optional error string. * * @param op the operation to be validated. * @return Optional error string, should be present only if validation resulted in an error. */ Optional validate(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.OperationValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTableValidator implements OperationValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public Optional validate(Operation op) { if (op instanceof CreateTableOperation) { final CreateTableOperation createTableOp = (CreateTableOperation) op; final String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { try { final long startupTimestampValue = Long.parseLong( createTableOp .getCatalogTable() .getOptions() .get("scan.startup.timestamp-millis")); if (startupTimestampValue < System.currentTimeMillis() - ONE_DAY) { return Optional.of( String.format( "Validation failed in %s: 'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within one day from the current time.", KafkaTableValidator.class.getName(), startupTimestampValue)); } } catch (NumberFormatException e) { return Optional.empty(); } } } return Optional.empty(); } }{code} I'm looking forward to get feedback from the community. Thanks was: h3. Summary Hello I'd like to suggest custom pluggable query operation validator support in flink sql gateway. As an sql gateway operator, there is need for query operation validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{OperationValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query operation validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; public interface OperationValidator extends Plugin { /** * Validate a given operation and return an optional error string. * * @param op the operation to be validated. * @return Optional error string, should be present only if validation resulted in an error. */ Optional validate(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.OperationValidator; import org.apache.flink.table.operations.Operation;
[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest custom pluggable query operation validator support in flink sql gateway. As an sql gateway operator, there is need for query operation validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{OperationValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query operation validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; public interface OperationValidator extends Plugin { /** * Validate a given operation and return an optional error string. * * @param op the operation to be validated. * @return Optional error string, should be present only if validation resulted in an error. */ Optional validate(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.OperationValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTableValidator implements OperationValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public Optional validate(Operation op) { if (op instanceof CreateTableOperation) { final CreateTableOperation createTableOp = (CreateTableOperation) op; final String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { try { final long startupTimestampValue = Long.parseLong( createTableOp .getCatalogTable() .getOptions() .get("scan.startup.timestamp-millis")); if (startupTimestampValue < System.currentTimeMillis() - ONE_DAY) { return Optional.of( String.format( "Validation failed in %s: 'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within one day from the current time.", KafkaTableValidator.class.getName(), startupTimestampValue)); } } catch (NumberFormatException e) { return Optional.empty(); } } } return Optional.empty(); } }{code} I'm looking forward to get feedback from the community. Thanks was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override
[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Summary: Support custom query operation validator for sql gateway (was: Add query validator support to flink sql gateway via spi pattern) > Support custom query operation validator for sql gateway > > > Key: FLINK-35560 > URL: https://issues.apache.org/jira/browse/FLINK-35560 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > > h3. Summary > Hello I'd like to suggest query validator support in flink sql gateway via > spi pattern. > As an sql gateway operator, there is need for query validation to only > execute safe queries and drop unsafe queries. > To address this need, I propose adding a {{QueryValidator}} interface in > flink sql gateway api package. > This interface will allow users to implement their own query validation > logic, providing benefits to flink sql gateway operators. > h3. Interface > Below is a draft for the interface. > It takes Operation and check whether the query is valid or not. > {code:java} > package org.apache.flink.table.gateway.api.validator; > import org.apache.flink.annotation.Public; > import org.apache.flink.table.operations.Operation; > /** > * Interface for implementing a validator that checks the safety of executing > queries. > */ > @Public > public interface QueryValidator { > boolean validateQuery(Operation op); > } > {code} > h3. Example implementation > Below is an example implementation that inspects Kafka table options, > specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value > is too small, which can cause high disk I/O load. > {code:java} > package org.apache.flink.table.gateway.api.validator; > import org.apache.flink.table.gateway.api.validator.QueryValidator; > import org.apache.flink.table.operations.Operation; > import org.apache.flink.table.operations.ddl.CreateTableOperation; > public class KafkaTimestampValidator implements QueryValidator { > private static final long ONE_DAY = 24 * 60 * 60 * 1000L; > @Override > public boolean validateQuery(Operation op) { > if (op instanceof CreateTableOperation) { > CreateTableOperation createTableOp = (CreateTableOperation) op; > String connector = > createTableOp.getCatalogTable().getOptions().get("connector"); > if ("kafka".equals(connector)) { > String startupTimestamp = > createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); > if (startupTimestamp != null && > Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { > return false; > } > } > } > return true; > } > }{code} > I'd be happy to implement this feature, if we can reach on agreement. > Thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
[ https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860998#comment-17860998 ] dongwoo.kim edited comment on FLINK-35565 at 6/30/24 1:52 PM: -- Since the stoppingOffset logic relies on the last record's offset, it can fall into an indefinite loop if all records are deleted. This [pr|https://github.com/apache/flink-connector-kafka/pull/100] can resolve the issue. Please take a look. was (Author: JIRAUSER300481): Since the stoppingOffset logic relies on the last record's offset, it can fall into an indefinite loop if all records are deleted. This [pr|https://github.com/apache/flink-connector-kafka/pull/100] resolves the issue. Please take a look. > Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset > -- > > Key: FLINK-35565 > URL: https://issues.apache.org/jira/browse/FLINK-35565 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 > Environment: This is reproduced on a *Flink 1.18.1* with the latest > Kafka connector 3.1.0-1.18 on a session cluster. >Reporter: Naci Simsek >Priority: Major > Attachments: image-2024-06-11-11-19-09-889.png, > taskmanager_localhost_54489-ac092a_log.txt > > > h2. Summary > Flink batch job gets into an infinite fetch loop and could not gracefully > finish if the connected Kafka topic is empty and starting offset value in > Flink job is lower than the current start/end offset of the related topic. > See below for details: > h2. How to reproduce > Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events > from Kafka topic. > Related Kafka topic is empty, there are no events, and the offset value is as > below: *15* > !image-2024-06-11-11-19-09-889.png|width=895,height=256! > > Flink job uses a *specific starting offset* value, which is +*less*+ than the > current offset of the topic/partition. > See below, it set as “4” > > {code:java} > package naci.grpId; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.connector.kafka.source.KafkaSource; > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.kafka.common.TopicPartition; > import java.util.HashMap; > import java.util.Map; > public class KafkaSource_Print { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // Define the specific offsets for the partitions > Map specificOffsets = new HashMap<>(); > specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // > Start from offset 4 for partition 0 > KafkaSource kafkaSource = KafkaSource > .builder() > .setBootstrapServers("localhost:9093") // Make sure the port > is correct > .setTopics("topic_test") > .setValueOnlyDeserializer(new SimpleStringSchema()) > > .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)) > .setBounded(OffsetsInitializer.latest()) > .build(); > DataStream stream = env.fromSource( > kafkaSource, > WatermarkStrategy.noWatermarks(), > "Kafka Source" > ); > stream.print(); > env.execute("Flink KafkaSource test job"); > } > }{code} > > > Here are the initial logs printed related to the offset, as soon as the job > gets submitted: > > {code:java} > 2024-05-30 12:15:50,010 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]] > 2024-05-30 12:15:50,069 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]]] > 2024-05-30 12:15:50,074 TRACE > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - > Seeking starting offsets to specified offsets: {topic_test-0=4} > 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer > [] - [Consumer clientId=KafkaSource--2381765882724812354-0, > groupId=null] Seeking to offset 4 for partition topic_test-0 >
[jira] [Commented] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
[ https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860998#comment-17860998 ] dongwoo.kim commented on FLINK-35565: - Since the stoppingOffset logic relies on the last record's offset, it can fall into an indefinite loop if all records are deleted. This [pr|https://github.com/apache/flink-connector-kafka/pull/100] resolves the issue. Please take a look. > Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset > -- > > Key: FLINK-35565 > URL: https://issues.apache.org/jira/browse/FLINK-35565 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 > Environment: This is reproduced on a *Flink 1.18.1* with the latest > Kafka connector 3.1.0-1.18 on a session cluster. >Reporter: Naci Simsek >Priority: Major > Attachments: image-2024-06-11-11-19-09-889.png, > taskmanager_localhost_54489-ac092a_log.txt > > > h2. Summary > Flink batch job gets into an infinite fetch loop and could not gracefully > finish if the connected Kafka topic is empty and starting offset value in > Flink job is lower than the current start/end offset of the related topic. > See below for details: > h2. How to reproduce > Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events > from Kafka topic. > Related Kafka topic is empty, there are no events, and the offset value is as > below: *15* > !image-2024-06-11-11-19-09-889.png|width=895,height=256! > > Flink job uses a *specific starting offset* value, which is +*less*+ than the > current offset of the topic/partition. > See below, it set as “4” > > {code:java} > package naci.grpId; > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.connector.kafka.source.KafkaSource; > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.kafka.common.TopicPartition; > import java.util.HashMap; > import java.util.Map; > public class KafkaSource_Print { > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > // Define the specific offsets for the partitions > Map specificOffsets = new HashMap<>(); > specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // > Start from offset 4 for partition 0 > KafkaSource kafkaSource = KafkaSource > .builder() > .setBootstrapServers("localhost:9093") // Make sure the port > is correct > .setTopics("topic_test") > .setValueOnlyDeserializer(new SimpleStringSchema()) > > .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets)) > .setBounded(OffsetsInitializer.latest()) > .build(); > DataStream stream = env.fromSource( > kafkaSource, > WatermarkStrategy.noWatermarks(), > "Kafka Source" > ); > stream.print(); > env.execute("Flink KafkaSource test job"); > } > }{code} > > > Here are the initial logs printed related to the offset, as soon as the job > gets submitted: > > {code:java} > 2024-05-30 12:15:50,010 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]] > 2024-05-30 12:15:50,069 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, > StoppingOffset: 15]]] > 2024-05-30 12:15:50,074 TRACE > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - > Seeking starting offsets to specified offsets: {topic_test-0=4} > 2024-05-30 12:15:50,074 INFO org.apache.kafka.clients.consumer.KafkaConsumer > [] - [Consumer clientId=KafkaSource--2381765882724812354-0, > groupId=null] Seeking to offset 4 for partition topic_test-0 > 2024-05-30 12:15:50,075 DEBUG > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - > SplitsChange handling result: [topic_test-0, start:4, stop: 15] > 2024-05-30 12:15:50,075 DEBUG > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Finished running task AddSplitsTask:
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:sql} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
dongwoo.kim created FLINK-35560: --- Summary: Add query validator support to flink sql gateway via spi pattern Key: FLINK-35560 URL: https://issues.apache.org/jira/browse/FLINK-35560 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Reporter: dongwoo.kim h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:sql} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851305#comment-17851305 ] dongwoo.kim commented on FLINK-34470: - Hello [~m.orazow] thanks for the review. I've added integration test code for this case. I'd be happy to get feedback from added test codes. Thanks. Best, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841823#comment-17841823 ] dongwoo.kim commented on FLINK-34470: - Hi [~m.orazow], I’ve just made a PR and would appreciate your review I also left a comment about the metric issue in the discussion area and would appreciate any feedback on that. Thanks Best, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027 ] dongwoo.kim edited comment on FLINK-34470 at 4/15/24 1:15 AM: -- Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best, Dongwoo was (Author: JIRAUSER300481): Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best regards, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027 ] dongwoo.kim commented on FLINK-34470: - Hello [~m.orazow], I'm glad that you're interested in collaborating, I’ll send a pr soon and keep you updated. Thanks for reaching out. Best regards, Dongwoo > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce this unexpected > behavior by following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code, split is finished only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > {code:java} > if (consumer.position(tp) >= stoppingOffset) { > recordsBySplits.setPartitionStoppingOffset(tp, > stoppingOffset); > finishSplitAtRecord( > tp, > stoppingOffset, > lastRecord.offset(), > finishedPartitions, > recordsBySplits); > } > {code} > Replacing if condition to *consumer.position(tp) >= stoppingOffset* in > [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] > can solve the problem. > *consumer.position(tp)* gets next record's offset if it exist and the last > record's offset if the next record doesn't exist. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix if we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code, split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging >
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging >
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code split is finished only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution {code:java} if (consumer.position(tp) >= stoppingOffset) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } {code} Replacing if condition to *consumer.position(tp) >= stoppingOffset* in [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. *consumer.position(tp)* gets next record's offset if it exist and the last record's offset if the next record doesn't exist. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's > request timeouts after hanging. We can always reproduce
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] can solve the problem. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >=
[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818500#comment-17818500 ] dongwoo.kim edited comment on FLINK-34470 at 2/19/24 3:12 PM: -- [~martijnvisser] I have used latest version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still exists. This [line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] seems to be causing the issue was (Author: JIRAUSER300481): [~martijnvisser] I have used latest version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still exists. This [line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] seems to be causing the issue > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >= stoppingOffset* condition to the if > statement. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix it we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix if we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix it we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >=
[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818500#comment-17818500 ] dongwoo.kim commented on FLINK-34470: - [~martijnvisser] I have used latest version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still exists. This [line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137] seems to be causing the issue > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >= stoppingOffset* condition to the if > statement. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be happy to implement about this fix it we can reach on agreement. > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix it we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix it we can reach on agreement. Thanks > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >=
[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
[ https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-34470: Description: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. This is related to this [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as getting count of the produced messages 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these control messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. I would be happy to implement about this fix it we can reach on agreement. Thanks was: h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as count(*) 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these controll messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. > Transactional message + Table api kafka source with 'latest-offset' scan > bound mode causes indefinitely hanging > --- > > Key: FLINK-34470 > URL: https://issues.apache.org/jira/browse/FLINK-34470 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > > h2. Summary > Hi we have faced issue with transactional message and table api kafka source. > If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request > timeouts after hanging. We can always reproduce this unexpected behavior by > following below steps. > This is related to this > [issue|https://issues.apache.org/jira/browse/FLINK-33484] too. > h2. How to reproduce > 1. Deploy transactional producer and stop after producing certain amount of > messages > 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple > query such as getting count of the produced messages > 3. Flink sql job gets stucked and timeouts. > h2. Cause > Transaction producer always produces [control > records|https://kafka.apache.org/documentation/#controlbatch] at the end of > the transaction. And these control messages are not polled by > {*}consumer.poll(){*}. (It is filtered internally). In > *KafkaPartitionSplitReader* code it finishes split only when > *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work > well with non transactional messages or streaming environment but in some > batch use cases it causes unexpected hanging. > h2. Proposed solution > Adding *consumer.position(tp) >= stoppingOffset* condition to the if > statement. > By this KafkaPartitionSplitReader is available to finish the split even when > the stopping offset is configured to control record's offset. > I would be
[jira] [Created] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
dongwoo.kim created FLINK-34470: --- Summary: Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging Key: FLINK-34470 URL: https://issues.apache.org/jira/browse/FLINK-34470 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Reporter: dongwoo.kim h2. Summary Hi we have faced issue with transactional message and table api kafka source. If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request timeouts after hanging. We can always reproduce this unexpected behavior by following below steps. h2. How to reproduce 1. Deploy transactional producer and stop after producing certain amount of messages 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query such as count(*) 3. Flink sql job gets stucked and timeouts. h2. Cause Transaction producer always produces [control records|https://kafka.apache.org/documentation/#controlbatch] at the end of the transaction. And these controll messages are not polled by {*}consumer.poll(){*}. (It is filtered internally). In *KafkaPartitionSplitReader* code it finishes split only when *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work well with non transactional messages or streaming environment but in some batch use cases it causes unexpected hanging. h2. Proposed solution Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. By this KafkaPartitionSplitReader is available to finish the split even when the stopping offset is configured to control record's offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33454) Adding tls configuration to IngressSpec
[ https://issues.apache.org/jira/browse/FLINK-33454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796210#comment-17796210 ] dongwoo.kim commented on FLINK-33454: - [~ryanvanhuuksloot] sorry for the late reply, thanks for your efforts :) > Adding tls configuration to IngressSpec > --- > > Key: FLINK-33454 > URL: https://issues.apache.org/jira/browse/FLINK-33454 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: dongwoo.kim >Assignee: Ryan van Huuksloot >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Hello, I want to propose new configuration parameter for IngressSpec. > Currently flink k8s operator creates ingress resource as we define but it > doesn't support tls configuration to secure ingress. > How about adding tls parameter on IngressSpec? > *IngressSpec* > tls: IngressTLS > *IngressTLSSpec* > Hosts: List > SecretName: String > If we could reach an agreement I'll be glad to take on the implementation. > Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33715) Enhance history server to archive multiple histories per jobid
[ https://issues.apache.org/jira/browse/FLINK-33715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33715: Description: Hello Flink team, I'd like to propose an improvement to how the job manager archives job histories and how flink history server fetches the history. *Currently, only one job history per jobid is available to be archived and fectched.* When a flink job tries to archive the job's history more than once, usually 'FileAlreadyExistsException' error happens. This makes sense in most cases, since a job typically gets a new ID when it gets restarted from latest checkpoint/savepoint. *_However, there's a specific situation where this behavior can be problematic:_* *_1) When we upgrade a job using the savepoint mode, the job's first history gets successfully archived._* *_2) If the same job later fails due to an error, its history isn't archived again because there's already a record with the same job ID._* This can be an issue because the most valuable information – why the job failed – gets lost. To simply solve this, I suggest to include currentTimeMillis to the history filename along with jobid. ( \{jobid}-\{currentTimeMillis} ) And also in the history fetching side parse jobid before the *"-"* delimiter and fetch all the histories for that jobid. For UI we can keep current display or maybe enhance with adding extra hierarchy for each jobid since each jobid can now have multiple histories. If we could reach an agreement I'll be glad to take on the implementation. Thanks in advance. was: Hello Flink team, I'd like to propose an improvement to how the job manager archives job histories and how flink history server fetches the history. Currently, only one job history per jobid is available to be archived and fectched. When a flink job tries to archive the job's history more than once, usually 'FileAlreadyExistsException' error happens. This makes sense in most cases, since a job typically gets a new ID when it gets restarted from latest checkpoint/savepoint. However, there's a specific situation where this behavior can be problematic: 1) When we upgrade a job using the savepoint mode, the job's first history gets successfully archived. 2) If the same job later fails due to an error, its history isn't archived again because there's already a record with the same job ID. This can be an issue because the most valuable information – why the job failed – gets lost. To simply solve this, I suggest to include currentTimeMillis to the history filename along with jobid. ( \{jobid}-\{currentTimeMillis} ) And also in the history fetching side parse jobid before the *"-"* delimiter and fetch all the histories for that jobid. For UI we can keep current display or maybe enhance with adding extra hierarchy for each jobid since each jobid can now have multiple histories. If we could reach an agreement I'll be glad to take on the implementation. Thanks in advance. > Enhance history server to archive multiple histories per jobid > -- > > Key: FLINK-33715 > URL: https://issues.apache.org/jira/browse/FLINK-33715 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: dongwoo.kim >Priority: Minor > > Hello Flink team, > I'd like to propose an improvement to how the job manager archives job > histories and how flink history server fetches the history. > *Currently, only one job history per jobid is available to be archived and > fectched.* > When a flink job tries to archive the job's history more than once, usually > 'FileAlreadyExistsException' error happens. > This makes sense in most cases, since a job typically gets a new ID when it > gets restarted from latest checkpoint/savepoint. > > *_However, there's a specific situation where this behavior can be > problematic:_* > *_1) When we upgrade a job using the savepoint mode, the job's first history > gets successfully archived._* > *_2) If the same job later fails due to an error, its history isn't archived > again because there's already a record with the same job ID._* > This can be an issue because the most valuable information – why the job > failed – gets lost. > > To simply solve this, I suggest to include currentTimeMillis to the history > filename along with jobid. ( \{jobid}-\{currentTimeMillis} ) > And also in the history fetching side parse jobid before the *"-"* delimiter > and fetch all the histories for that jobid. > For UI we can keep current display or maybe enhance with adding extra > hierarchy for each jobid since each jobid can now have multiple histories. > > If we could reach an agreement I'll be glad to take on the implementation. > Thanks in advance. -- This message was sent by Atlassian Jira
[jira] [Created] (FLINK-33715) Enhance history server to archive multiple histories per jobid
dongwoo.kim created FLINK-33715: --- Summary: Enhance history server to archive multiple histories per jobid Key: FLINK-33715 URL: https://issues.apache.org/jira/browse/FLINK-33715 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: dongwoo.kim Hello Flink team, I'd like to propose an improvement to how the job manager archives job histories and how flink history server fetches the history. Currently, only one job history per jobid is available to be archived and fectched. When a flink job tries to archive the job's history more than once, usually 'FileAlreadyExistsException' error happens. This makes sense in most cases, since a job typically gets a new ID when it gets restarted from latest checkpoint/savepoint. However, there's a specific situation where this behavior can be problematic: 1) When we upgrade a job using the savepoint mode, the job's first history gets successfully archived. 2) If the same job later fails due to an error, its history isn't archived again because there's already a record with the same job ID. This can be an issue because the most valuable information – why the job failed – gets lost. To simply solve this, I suggest to include currentTimeMillis to the history filename along with jobid. ( \{jobid}-\{currentTimeMillis} ) And also in the history fetching side parse jobid before the *"-"* delimiter and fetch all the histories for that jobid. For UI we can keep current display or maybe enhance with adding extra hierarchy for each jobid since each jobid can now have multiple histories. If we could reach an agreement I'll be glad to take on the implementation. Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33454) Adding tls configuration to IngressSpec
[ https://issues.apache.org/jira/browse/FLINK-33454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33454: Description: Hello, I want to propose new configuration parameter for IngressSpec. Currently flink k8s operator creates ingress resource as we define but it doesn't support tls configuration to secure ingress. How about adding tls parameter on IngressSpec? *IngressSpec* tls: IngressTLS *IngressTLSSpec* Hosts: List SecretName: String If we could reach an agreement I'll be glad to take on the implementation. Thanks in advance. was: Hello, I want to propose new configuration parameter for IngressSpec. Currently flink k8s operator creates ingress resource as we define but it doesn't support tls configuration to secure ingress. How about adding tls parameter on IngressSpec? *IngressSpec* tls: IngressTLS *IngressTLSSpec* Hosts: List SecretName: String If we could reach to agreement I'll be glad to take on the implementation. Thanks in advance. > Adding tls configuration to IngressSpec > --- > > Key: FLINK-33454 > URL: https://issues.apache.org/jira/browse/FLINK-33454 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: dongwoo.kim >Priority: Minor > > Hello, I want to propose new configuration parameter for IngressSpec. > Currently flink k8s operator creates ingress resource as we define but it > doesn't support tls configuration to secure ingress. > How about adding tls parameter on IngressSpec? > *IngressSpec* > tls: IngressTLS > *IngressTLSSpec* > Hosts: List > SecretName: String > If we could reach an agreement I'll be glad to take on the implementation. > Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33454) Adding tls configuration to IngressSpec
dongwoo.kim created FLINK-33454: --- Summary: Adding tls configuration to IngressSpec Key: FLINK-33454 URL: https://issues.apache.org/jira/browse/FLINK-33454 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: dongwoo.kim Hello, I want to propose new configuration parameter for IngressSpec. Currently flink k8s operator creates ingress resource as we define but it doesn't support tls configuration to secure ingress. How about adding tls parameter on IngressSpec? *IngressSpec* tls: IngressTLS *IngressTLSSpec* Hosts: List SecretName: String If we could reach to agreement I'll be glad to take on the implementation. Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780439#comment-17780439 ] dongwoo.kim edited comment on FLINK-33324 at 10/27/23 3:48 PM: --- Thanks for the opnion [~roman] , and sorry for the late reply. I've been thinking about ways to detect the slow restore issue in Flink, and here are a couple of methods I've considered *1. Alerts Based on Checkpoint Failures.* Since flink tries to trigger checkpoint even though all the subtasks are not in running state it always fails. While this indicates a possible issue, it doesn't specifically tell us if it's due to slow recovery *2. Using the REST API* Another approach could be to leveraging Flink's REST API to check the status of subtasks. I found that even if some subtasks are initializing state the *"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was unexpected. However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* endpoint to identify subtasks that are in the *initializing* state. A limitation here is that this endpoint just shows the current state and doesn't provide the duration for which a subtask has been in that state. So, some additional logic might be needed to track how long subtasks have been 'stuck'. Considering the above, if we're looking to enhance Flink's observability, what about introducing a metric that shows restore time? Perhaps something named {*}lastDurationOfRestore{*}? WDYT? Thanks. was (Author: JIRAUSER300481): Thanks for the opnion [~roman] , and sorry for the late reply. I've been thinking about ways to detect the slow restore issue in Flink, and here are a couple of methods I've considered: *1. Alerts Based on Checkpoint Failures.* Since flink tries to trigger checkpoint even though all the subtasks are not in running state it always fails. While this indicates a possible issue, it doesn't specifically tell us if it's due to slow recovery *2. Using the REST API* Another approach could be to leveraging Flink's REST API to check the status of subtasks. I found that even if some subtasks are initializing state the *"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was unexpected. However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* endpoint to identify subtasks that are in the *initializing* state. A limitation here is that this endpoint just shows the current state and doesn't provide the duration for which a subtask has been in that state. So, some additional logic might be needed to track how long subtasks have been 'stuck'. Considering the above, if we're looking to enhance Flink's observability, what about introducing a metric that shows restore time? Perhaps something named {*}lastDurationOfRestore{*}? WDYT? Thanks. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state
[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780439#comment-17780439 ] dongwoo.kim commented on FLINK-33324: - Thanks for the opnion [~roman] , and sorry for the late reply. I've been thinking about ways to detect the slow restore issue in Flink, and here are a couple of methods I've considered: *1. Alerts Based on Checkpoint Failures.* Since flink tries to trigger checkpoint even though all the subtasks are not in running state it always fails. While this indicates a possible issue, it doesn't specifically tell us if it's due to slow recovery *2. Using the REST API* Another approach could be to leveraging Flink's REST API to check the status of subtasks. I found that even if some subtasks are initializing state the *"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was unexpected. However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* endpoint to identify subtasks that are in the *initializing* state. A limitation here is that this endpoint just shows the current state and doesn't provide the duration for which a subtask has been in that state. So, some additional logic might be needed to track how long subtasks have been 'stuck'. Considering the above, if we're looking to enhance Flink's observability, what about introducing a metric that shows restore time? Perhaps something named {*}lastDurationOfRestore{*}? WDYT? Thanks. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > (I've simulated network chaos, using > [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] > chaos engineering tool.) > !image-2023-10-20-17-42-11-504.png|width=940,height=317! > > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755 ] dongwoo.kim edited comment on FLINK-33324 at 10/20/23 1:25 PM: --- Hi, [~pnowojski] Thanks for the opinion. First about the code, I just simply wrapped the main logic code [here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94] with callable object and combined with future.get(timeout). Please consider that it was just initial check for feasibility without a deep dive into the Flink code. When considering manual action from human, I agree solving this issue with alert system seem practical. However, our goal for handling the failover loop was to automate operations using the failure-rate restart strategy and a cronJob that monitors the Flink job's status. Instead of adding ambiguous conditions in the cronJob, treating an unusually long restore operation as a failure simplifies our process. Yet, I understand from the feedback that this approach might fit more to our team's unique needs and might not be as helpful for everyone else. was (Author: JIRAUSER300481): Hi, [~pnowojski] Thanks for the opinion. First about the code, I just simply wrapped the main logic code [here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94] with callable object and combined with future.get(timeout). Please consider that it was just initial check for feasibility without a deep dive into the Flink code. When considering manual action from human, I agree solving this issue with alert system seem practical. However, our goal for handling the failover loop was to automate operations using the failure-rate restart strategy and a cronJob that monitors the Flink job's status. Instead of adding complex conditions in the cronJob, treating an unusually long restore operation as a failure simplifies our process. Yet, I understand from the feedback that this approach might fit more to our team's unique needs and might not be as helpful for everyone else. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception >
[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755 ] dongwoo.kim edited comment on FLINK-33324 at 10/20/23 1:24 PM: --- Hi, [~pnowojski] Thanks for the opinion. First about the code, I just simply wrapped the main logic code [here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94] with callable object and combined with future.get(timeout). Please consider that it was just initial check for feasibility without a deep dive into the Flink code. When considering manual action from human, I agree solving this issue with alert system seem practical. However, our goal for handling the failover loop was to automate operations using the failure-rate restart strategy and a cronJob that monitors the Flink job's status. Instead of adding complex conditions in the cronJob, treating an unusually long restore operation as a failure simplifies our process. Yet, I understand from the feedback that this approach might fit more to our team's unique needs and might not be as helpful for everyone else. was (Author: JIRAUSER300481): Hi, [~pnowojski] Thanks for the opinion. First about the code, I just simply wrapped the main logic code [here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94] with callable object and combined with future.get(timeout). Please consider that it was just initial check for feasibility without a deep dive into the Flink code. When considering manual action from human, solving this issue with alert system seem practical. However, our goal for handling the failover loop was to automate operations using the failure-rate restart strategy and a cronJob that monitors the Flink job's status. Instead of adding complex conditions in the cronJob, treating an unusually long restore operation as a failure simplifies our process. Yet, I understand from the feedback that this approach might fit more to our team's unique needs and might not be as helpful for everyone else. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception >
[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1757#comment-1757 ] dongwoo.kim commented on FLINK-33324: - Hi, [~roman] Thanks for considering my suggestion. For some background, we are leveraging failure-rate restart strategy and monitoring cronJob to manage flink application. By marking long hanging restore operation as a failure retry can be initiated. And if the retries don't resolve the issue, the job should ultimately fail. This way, the cronJob monitoring the Flink application can quickly detect it and redeploy the job from its last state. (In this scenario new task manager pod is created so this specific issue could be solved) As you pointed out, introducing a timeout might have various side effects. Whether one can tolerate a lengthy restore or prefers quicker retries and redeployments might vary based on operational needs. What if we make this an optional feature? By default, there would be no timeout, but developers could configure it if desired. Thanks in advance. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > (I've simulated network chaos, using > [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] > chaos engineering tool.) > !image-2023-10-20-17-42-11-504.png|width=940,height=317! > > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1756#comment-1756 ] dongwoo.kim commented on FLINK-33324: - Hi, [~Zakelly] Thanks for sharing your opinion and experience. Unfortunately we failed to discover the root cause. However, we noticed that during the problem, our hdfs wasn't under any unusual load, in fact, it was below its average. Additionally, our state size isn't particularly large. All other tm completed the restore process, except one that got stuck for over an hour. After removing that specific tm, everything went back to normal. Your feedback made me thinking enforcing timeout for restore operation might have hugh side effect. Perhaps I was too focused on our specific situation. What if we keep the default setting without a timeout, but allow developers the option to set one if needed? Depending on their specific needs and SLO, long hanging restore could be considered as failure. WDYT? > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > (I've simulated network chaos, using > [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] > chaos engineering tool.) > !image-2023-10-20-17-42-11-504.png|width=940,height=317! > > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755 ] dongwoo.kim commented on FLINK-33324: - Hi, [~pnowojski] Thanks for the opinion. First about the code, I just simply wrapped the main logic code [here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94] with callable object and combined with future.get(timeout). Please consider that it was just initial check for feasibility without a deep dive into the Flink code. When considering manual action from human, solving this issue with alert system seem practical. However, our goal for handling the failover loop was to automate operations using the failure-rate restart strategy and a cronJob that monitors the Flink job's status. Instead of adding complex conditions in the cronJob, treating an unusually long restore operation as a failure simplifies our process. Yet, I understand from the feedback that this approach might fit more to our team's unique needs and might not be as helpful for everyone else. > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=683,height=604! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. > As a result, the job remained in the initialization phase for very long > period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works as > expected. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > (I've simulated network chaos, using > [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] > chaos engineering tool.) > !image-2023-10-20-17-42-11-504.png|width=940,height=317! > > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33324: Description: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=683,height=604! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works as expected. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. (I've simulated network chaos, using [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] chaos engineering tool.) !image-2023-10-20-17-42-11-504.png|width=940,height=317! Thank you for considering my proposal. I'm looking forward to hear your thoughts. If there's agreement on this, I'd be happy to work on implementing this feature. was: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=565,height=500! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works as expected. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. (I've simulated network chaos, using [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] chaos
[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33324: Description: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=565,height=500! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works as expected. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. (I've simulated network chaos, using [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts] chaos engineering tool.) !image-2023-10-20-17-42-11-504.png|width=1121,height=378! Thank you for considering my proposal. I'm looking forward to hear your thoughts. If there's agreement on this, I'd be happy to work on implementing this feature. was: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=565,height=500! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. I've simulated network chaos, using litmus chaos engineering tool. !image-2023-10-20-17-30-10-751.png|width=839,height=283! Thank you for considering my proposal. I'm looking forward to hear
[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33324: Description: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=565,height=500! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. I've simulated network chaos, using litmus chaos engineering tool. !image-2023-10-20-17-30-10-751.png|width=839,height=283! Thank you for considering my proposal. I'm looking forward to hear your thoughts. If there's agreement on this, I'd be happy to work on implementing this feature. was: Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=847,height=749! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. I've simulated network chaos, using litmus chaos engineering tool. !image-2023-10-20-17-30-10-751.png|width=839,height=283! Thank you for considering my proposal. I'm looking forward to hear your thoughts. If there's agreement on this, I'd be happy to work on implementing this feature. > Add flink managed timeout mechanism for backend
[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33324: Attachment: (was: image-2023-10-20-17-30-02-290.png) > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=565,height=500! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. As a > result, the job remained in the initialization phase for very long period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > I've simulated network chaos, using litmus chaos engineering tool. > !image-2023-10-20-17-30-10-751.png|width=839,height=283! > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
[ https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33324: Attachment: image-2023-10-20-17-42-11-504.png > Add flink managed timeout mechanism for backend restore operation > - > > Key: FLINK-33324 > URL: https://issues.apache.org/jira/browse/FLINK-33324 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends >Reporter: dongwoo.kim >Priority: Minor > Attachments: image-2023-10-20-15-16-53-324.png, > image-2023-10-20-17-42-11-504.png > > > Hello community, I would like to share an issue our team recently faced and > propose a feature to mitigate similar problems in the future. > h2. Issue > Our Flink streaming job encountered consecutive checkpoint failures and > subsequently attempted a restart. > This failure occurred due to timeouts in two subtasks located within the same > task manager. > The restore operation for this particular task manager also got stuck, > resulting in an "initializing" state lasting over an hour. > Once we realized the hang during the restore operation, we terminated the > task manager pod, resolving the issue. > !image-2023-10-20-15-16-53-324.png|width=565,height=500! > The sequence of events was as follows: > 1. Checkpoint timed out for subtasks within the task manager, referred to as > tm-32. > 2. The Flink job failed and initiated a restart. > 3. Restoration was successful for 282 subtasks, but got stuck for the 2 > subtasks in tm-32. > 4. While the Flink tasks weren't fully in running state, checkpointing was > still being triggered, leading to consecutive checkpoint failures. > 5. These checkpoint failures seemed to be ignored, and did not count to the > execution.checkpointing.tolerable-failed-checkpoints configuration. As a > result, the job remained in the initialization phase for very long period. > 6. Once we found this, we terminated the tm-32 pod, leading to a successful > Flink job restart. > h2. Suggestion > I feel that, a Flink job remaining in the initializing state indefinitely is > not ideal. > To enhance resilience, I think it would be helpful if we could add timeout > feature for restore operation. > If the restore operation exceeds a specified duration, an exception should be > thrown, causing the job to fail. > This way, we can address restore-related issues similarly to how we handle > checkpoint failures. > h2. Notes > Just to add, I've made a basic version of this feature to see if it works. > I've attached a picture from the Flink UI that shows the timeout exception > happened during restore operation. > It's just a start, but I hope it helps with our discussion. > I've simulated network chaos, using litmus chaos engineering tool. > !image-2023-10-20-17-30-10-751.png|width=839,height=283! > Thank you for considering my proposal. I'm looking forward to hear your > thoughts. > If there's agreement on this, I'd be happy to work on implementing this > feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation
dongwoo.kim created FLINK-33324: --- Summary: Add flink managed timeout mechanism for backend restore operation Key: FLINK-33324 URL: https://issues.apache.org/jira/browse/FLINK-33324 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / State Backends Reporter: dongwoo.kim Attachments: image-2023-10-20-15-16-53-324.png, image-2023-10-20-17-30-02-290.png Hello community, I would like to share an issue our team recently faced and propose a feature to mitigate similar problems in the future. h2. Issue Our Flink streaming job encountered consecutive checkpoint failures and subsequently attempted a restart. This failure occurred due to timeouts in two subtasks located within the same task manager. The restore operation for this particular task manager also got stuck, resulting in an "initializing" state lasting over an hour. Once we realized the hang during the restore operation, we terminated the task manager pod, resolving the issue. !image-2023-10-20-15-16-53-324.png|width=847,height=749! The sequence of events was as follows: 1. Checkpoint timed out for subtasks within the task manager, referred to as tm-32. 2. The Flink job failed and initiated a restart. 3. Restoration was successful for 282 subtasks, but got stuck for the 2 subtasks in tm-32. 4. While the Flink tasks weren't fully in running state, checkpointing was still being triggered, leading to consecutive checkpoint failures. 5. These checkpoint failures seemed to be ignored, and did not count to the execution.checkpointing.tolerable-failed-checkpoints configuration. As a result, the job remained in the initialization phase for very long period. 6. Once we found this, we terminated the tm-32 pod, leading to a successful Flink job restart. h2. Suggestion I feel that, a Flink job remaining in the initializing state indefinitely is not ideal. To enhance resilience, I think it would be helpful if we could add timeout feature for restore operation. If the restore operation exceeds a specified duration, an exception should be thrown, causing the job to fail. This way, we can address restore-related issues similarly to how we handle checkpoint failures. h2. Notes Just to add, I've made a basic version of this feature to see if it works. I've attached a picture from the Flink UI that shows the timeout exception happened during restore operation. It's just a start, but I hope it helps with our discussion. I've simulated network chaos, using litmus chaos engineering tool. !image-2023-10-20-17-30-10-751.png|width=839,height=283! Thank you for considering my proposal. I'm looking forward to hear your thoughts. If there's agreement on this, I'd be happy to work on implementing this feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod
[ https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764455#comment-17764455 ] dongwoo.kim commented on FLINK-33066: - Thanks [~gyfora], I have opened the pr :) > Enable to inject environment variable from secret/configmap to operatorPod > -- > > Key: FLINK-33066 > URL: https://issues.apache.org/jira/browse/FLINK-33066 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: dongwoo.kim >Assignee: dongwoo.kim >Priority: Minor > Labels: pull-request-available > > Hello, I've been working with the Flink Kubernetes operator and noticed that > the {{operatorPod.env}} only allows for simple key-value pairs and doesn't > support Kubernetes {{valueFrom}} syntax. > How about changing template to support more various k8s syntax? > *Current template* > {code:java} > {{- range $k, $v := .Values.operatorPod.env }} > - name: {{ $v.name | quote }} > value: {{ $v.value | quote }} > {{- end }}{code} > > *Proposed template* > 1) Modify template like below > {code:java} > {{- with .Values.operatorPod.env }} > {{- toYaml . | nindent 12 }} > {{- end }} > {code} > 2) create extra config, *Values.operatorPod.envFrom* and utilize this > > I'd be happy to implement this update if it's approved. > Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod
[ https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33066: Description: Hello, I've been working with the Flink Kubernetes operator and noticed that the {{operatorPod.env}} only allows for simple key-value pairs and doesn't support Kubernetes {{valueFrom}} syntax. How about changing template to support more various k8s syntax? *Current template* {code:java} {{- range $k, $v := .Values.operatorPod.env }} - name: {{ $v.name | quote }} value: {{ $v.value | quote }} {{- end }}{code} *Proposed template* 1) Modify template like below {code:java} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} {code} 2) create extra config, *Values.operatorPod.envFrom* and utilize this I'd be happy to implement this update if it's approved. Thanks in advance. was: Hello, I've been working with the Flink Kubernetes operator and noticed that the {{operatorPod.env}} only allows for simple key-value pairs and doesn't support Kubernetes {{valueFrom}} syntax. How about changing template to support more various k8s syntax? *Current template* {code:java} {{- range $k, $v := .Values.operatorPod.env }} - name: {{ $v.name | quote }} value: {{ $v.value | quote }} {{- end }}{code} *Proposed template* 1) {code:java} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} {code} 2) create extra config, *Values.operatorPod.envFrom* and utilize this I'd be happy to implement this update if it's approved. Thanks in advance. > Enable to inject environment variable from secret/configmap to operatorPod > -- > > Key: FLINK-33066 > URL: https://issues.apache.org/jira/browse/FLINK-33066 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: dongwoo.kim >Priority: Minor > > Hello, I've been working with the Flink Kubernetes operator and noticed that > the {{operatorPod.env}} only allows for simple key-value pairs and doesn't > support Kubernetes {{valueFrom}} syntax. > How about changing template to support more various k8s syntax? > *Current template* > {code:java} > {{- range $k, $v := .Values.operatorPod.env }} > - name: {{ $v.name | quote }} > value: {{ $v.value | quote }} > {{- end }}{code} > > *Proposed template* > 1) Modify template like below > {code:java} > {{- with .Values.operatorPod.env }} > {{- toYaml . | nindent 12 }} > {{- end }} > {code} > 2) create extra config, *Values.operatorPod.envFrom* and utilize this > > I'd be happy to implement this update if it's approved. > Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod
dongwoo.kim created FLINK-33066: --- Summary: Enable to inject environment variable from secret/configmap to operatorPod Key: FLINK-33066 URL: https://issues.apache.org/jira/browse/FLINK-33066 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: dongwoo.kim Hello, I've been working with the Flink Kubernetes operator and noticed that the {{operatorPod.env}} only allows for simple key-value pairs and doesn't support Kubernetes {{valueFrom}} syntax. How about changing template to support more various k8s syntax? *Current template* {code:java} {{- range $k, $v := .Values.operatorPod.env }} - name: {{ $v.name | quote }} value: {{ $v.value | quote }} {{- end }}{code} *Proposed template* 1) {code:java} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} {code} 2) create extra config, *Values.operatorPod.envFrom* and utilize this I'd be happy to implement this update if it's approved. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod
[ https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-33066: Description: Hello, I've been working with the Flink Kubernetes operator and noticed that the {{operatorPod.env}} only allows for simple key-value pairs and doesn't support Kubernetes {{valueFrom}} syntax. How about changing template to support more various k8s syntax? *Current template* {code:java} {{- range $k, $v := .Values.operatorPod.env }} - name: {{ $v.name | quote }} value: {{ $v.value | quote }} {{- end }}{code} *Proposed template* 1) {code:java} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} {code} 2) create extra config, *Values.operatorPod.envFrom* and utilize this I'd be happy to implement this update if it's approved. Thanks in advance. was: Hello, I've been working with the Flink Kubernetes operator and noticed that the {{operatorPod.env}} only allows for simple key-value pairs and doesn't support Kubernetes {{valueFrom}} syntax. How about changing template to support more various k8s syntax? *Current template* {code:java} {{- range $k, $v := .Values.operatorPod.env }} - name: {{ $v.name | quote }} value: {{ $v.value | quote }} {{- end }}{code} *Proposed template* 1) {code:java} {{- with .Values.operatorPod.env }} {{- toYaml . | nindent 12 }} {{- end }} {code} 2) create extra config, *Values.operatorPod.envFrom* and utilize this I'd be happy to implement this update if it's approved. > Enable to inject environment variable from secret/configmap to operatorPod > -- > > Key: FLINK-33066 > URL: https://issues.apache.org/jira/browse/FLINK-33066 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Reporter: dongwoo.kim >Priority: Minor > > Hello, I've been working with the Flink Kubernetes operator and noticed that > the {{operatorPod.env}} only allows for simple key-value pairs and doesn't > support Kubernetes {{valueFrom}} syntax. > How about changing template to support more various k8s syntax? > *Current template* > {code:java} > {{- range $k, $v := .Values.operatorPod.env }} > - name: {{ $v.name | quote }} > value: {{ $v.value | quote }} > {{- end }}{code} > > *Proposed template* > 1) > {code:java} > {{- with .Values.operatorPod.env }} > {{- toYaml . | nindent 12 }} > {{- end }} > {code} > 2) create extra config, *Values.operatorPod.envFrom* and utilize this > > I'd be happy to implement this update if it's approved. > Thanks in advance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758611#comment-17758611 ] dongwoo.kim edited comment on FLINK-32760 at 8/24/23 2:17 PM: -- Hi [~luoyuxia] , I have opened backport PRs. Please check in your free time. 1.16: [https://github.com/apache/flink/pull/23286] 1.17: [https://github.com/apache/flink/pull/23287] 1.18: [https://github.com/apache/flink/pull/23288] was (Author: JIRAUSER300481): Hi [~luoyuxia] , I have opened backport PRs. Please check in your free time. [https://github.com/apache/flink/pull/23286] [https://github.com/apache/flink/pull/23287] [https://github.com/apache/flink/pull/23288] > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758611#comment-17758611 ] dongwoo.kim commented on FLINK-32760: - Hi [~luoyuxia] , I have opened backport PRs. Please check in your free time. [https://github.com/apache/flink/pull/23286] [https://github.com/apache/flink/pull/23287] [https://github.com/apache/flink/pull/23288] > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752577#comment-17752577 ] dongwoo.kim commented on FLINK-32760: - Hi [~luoyuxia], I have opened the pr. I'd greatly appreciate it if you could review the pr when you have some free time. Thank you! > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > Labels: pull-request-available > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751715#comment-17751715 ] dongwoo.kim commented on FLINK-32760: - Thanks for your help. I'll soon share the pr > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32760: Affects Version/s: 1.17.1 > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.17.1 >Reporter: dongwoo.kim >Priority: Major > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32760: Description: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but also *shaded.parquet* prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} {color:#172b4d}So we need to shade both.{color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that adding this rule could resolve the above error. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks was: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but also *shaded.parquet* prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} {color:#172b4d}So we need to shade both.{color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: dongwoo.kim >Priority: Major > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that adding this rule could resolve the above error. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy
[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32760: Description: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but also *shaded.parquet* prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} {color:#172b4d}So we need to shade both.{color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks was: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes org.apache.parquet but also shaded.parquet prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: dongwoo.kim >Priority: Major > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but > also *shaded.parquet* prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}So we need to shade both.{color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that if we add below rule, the above error is resolved. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted.
[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
[ https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32760: Description: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes org.apache.parquet but also shaded.parquet prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks was: h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes org.apache.parquet but also shaded.parquet prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72] {color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks > Version Conflict in flink-sql-connector-hive for shaded.parquet prefix > packages > --- > > Key: FLINK-32760 > URL: https://issues.apache.org/jira/browse/FLINK-32760 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: dongwoo.kim >Priority: Major > Attachments: image-2023-08-05-14-50-47-806.png > > > h2. Summary > In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading > parquet dependency from *hive-exec* is done. > But I think this is not enough and causing errors like below when I try to > read parquet file using sql-gateway which requires both *flink-parquet* and > *flink-sql-connector-hive* dependencies. > !image-2023-08-05-14-50-47-806.png|width=1392,height=909! > > h2. {color:#172b4d}Cause{color} > {color:#172b4d}Parquet dependency not only includes org.apache.parquet but > also shaded.parquet prefix dependencies. > ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color} > {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift > 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with > Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} > {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against > 0.9.3, causing the error.{color} > h2. {color:#172b4d}Proposed solution{color} > Adding new shading rule to flink-sql-connector-hive project. > I have confirmed that if we add below rule, the above error is resolved. > {code:xml} > > shaded.parquet > shaded.parquet.flink.hive.shaded > {code} > > I would be happy to implement it if the proposal is accepted. Thanks > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages
dongwoo.kim created FLINK-32760: --- Summary: Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages Key: FLINK-32760 URL: https://issues.apache.org/jira/browse/FLINK-32760 Project: Flink Issue Type: Bug Components: Connectors / Hive Reporter: dongwoo.kim Attachments: image-2023-08-05-14-50-47-806.png h2. Summary In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading parquet dependency from *hive-exec* is done. But I think this is not enough and causing errors like below when I try to read parquet file using sql-gateway which requires both *flink-parquet* and *flink-sql-connector-hive* dependencies. !image-2023-08-05-14-50-47-806.png|width=1392,height=909! h2. {color:#172b4d}Cause{color} {color:#172b4d}Parquet dependency not only includes org.apache.parquet but also shaded.parquet prefix dependencies. ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72] {color} {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 0.16.0 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color} {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 0.9.3, causing the error.{color} h2. {color:#172b4d}Proposed solution{color} Adding new shading rule to flink-sql-connector-hive project. I have confirmed that if we add below rule, the above error is resolved. {code:xml} shaded.parquet shaded.parquet.flink.hive.shaded {code} I would be happy to implement it if the proposal is accepted. Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717 ] dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:22 PM: -- I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version configured to 1.16.1 was (Author: JIRAUSER300481): I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version to 1.16.1 > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717 ] dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:21 PM: -- I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version, since they had flink version to 1.16.1 was (Author: JIRAUSER300481): I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version. > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717 ] dongwoo.kim commented on FLINK-32408: - I think this is fixed in new version because in main branch's pom.xml, flink version is updated from *1.16.1* to *1.17.1.* But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports flink 1.17 version. > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32408: Description: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't, only *high-availability* works *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] was: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't{*}.{*} *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't, only *high-availability* works > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
[ https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-32408: Description: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work but it didn't{*}.{*} *ref* [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] was: In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work ** but it didn't{*}.{*} ref: [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > JobManager HA configuration update needed in Flink k8s Operator > > > Key: FLINK-32408 > URL: https://issues.apache.org/jira/browse/FLINK-32408 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.5.0 >Reporter: dongwoo.kim >Priority: Minor > Fix For: kubernetes-operator-1.6.0 > > > In flink 1.17 documentation it says, to configure job manger ha we have to > configure *high-availability.type* key not *high-availability* key{*}.{*} (It > seems to be changed from 1.17) > And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. > So I expected that configuring job manager ha with *high-availability.type* > should work but it didn't{*}.{*} > *ref* > [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] > > [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator
dongwoo.kim created FLINK-32408: --- Summary: JobManager HA configuration update needed in Flink k8s Operator Key: FLINK-32408 URL: https://issues.apache.org/jira/browse/FLINK-32408 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.5.0 Reporter: dongwoo.kim Fix For: kubernetes-operator-1.6.0 In flink 1.17 documentation it says, to configure job manger ha we have to configure *high-availability.type* key not *high-availability* key{*}.{*} (It seems to be changed from 1.17) And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. So I expected that configuring job manager ha with *high-availability.type* should work ** but it didn't{*}.{*} ref: [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability] [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core] -- This message was sent by Atlassian Jira (v8.20.10#820010)