[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway

2024-07-15 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, I feel there is a need for operation validation to 
ensure only safe operations are executed and unsafe operations are dropped.
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.core.plugin.Plugin;
import org.apache.flink.table.operations.Operation;

import java.util.Optional;

public interface OperationValidator extends Plugin {
/**
 * Validate a given operation and return an optional error string.
 *
 * @param op the operation to be validated.
 * @return Optional error string, should be present only if validation 
resulted in an error.
 */
Optional validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTableValidator implements OperationValidator {
private static final long ONE_DAY = 24 * 60 * 60 * 1000L;

@Override
public Optional validate(Operation op) {
if (op instanceof CreateTableOperation) {
final CreateTableOperation createTableOp = (CreateTableOperation) 
op;
final String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
try {
final long startupTimestampValue =
Long.parseLong(
createTableOp
.getCatalogTable()
.getOptions()

.get("scan.startup.timestamp-millis"));
if (startupTimestampValue < System.currentTimeMillis() - 
ONE_DAY) {
return Optional.of(
String.format(
"Validation failed in %s: 
'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within 
one day from the current time.",
KafkaTableValidator.class.getName(),
startupTimestampValue));
}
} catch (NumberFormatException e) {
return Optional.empty();
}
}
}
return Optional.empty();
}
}{code}

I'm looking forward to get feedback from the community. 
Thanks

  was:
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, I feel there is a need for operation validation to 
ensure only safe operations are executed and unsafe operations are dropped.
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
/**
 * Validate a given operation and return an optional error string.
 *
 * @param op the operation to be validated.
 * @return Optional error string, should be present only if validation 
resulted in an error.
 */
Optional validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;

[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway

2024-07-15 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, I feel there is a need for operation validation to 
ensure only safe operations are executed and unsafe operations are dropped.
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
/**
 * Validate a given operation and return an optional error string.
 *
 * @param op the operation to be validated.
 * @return Optional error string, should be present only if validation 
resulted in an error.
 */
Optional validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTableValidator implements OperationValidator {
private static final long ONE_DAY = 24 * 60 * 60 * 1000L;

@Override
public Optional validate(Operation op) {
if (op instanceof CreateTableOperation) {
final CreateTableOperation createTableOp = (CreateTableOperation) 
op;
final String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
try {
final long startupTimestampValue =
Long.parseLong(
createTableOp
.getCatalogTable()
.getOptions()

.get("scan.startup.timestamp-millis"));
if (startupTimestampValue < System.currentTimeMillis() - 
ONE_DAY) {
return Optional.of(
String.format(
"Validation failed in %s: 
'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within 
one day from the current time.",
KafkaTableValidator.class.getName(),
startupTimestampValue));
}
} catch (NumberFormatException e) {
return Optional.empty();
}
}
}
return Optional.empty();
}
}{code}

I'm looking forward to get feedback from the community. 
Thanks

  was:
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, there is need for query operation validation to 
only execute safe queries and drop unsafe queries. 
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
/**
 * Validate a given operation and return an optional error string.
 *
 * @param op the operation to be validated.
 * @return Optional error string, should be present only if validation 
resulted in an error.
 */
Optional validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;

[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway

2024-07-15 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest custom pluggable query operation validator support in 
flink sql gateway.

As an sql gateway operator, there is need for query operation validation to 
only execute safe queries and drop unsafe queries. 
To address this need, I propose adding a {{OperationValidator}} interface in 
flink sql gateway api package. 
This interface will allow users to implement their own query operation 
validation logic, providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

public interface OperationValidator extends Plugin {
/**
 * Validate a given operation and return an optional error string.
 *
 * @param op the operation to be validated.
 * @return Optional error string, should be present only if validation 
resulted in an error.
 */
Optional validate(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.OperationValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTableValidator implements OperationValidator {
private static final long ONE_DAY = 24 * 60 * 60 * 1000L;

@Override
public Optional validate(Operation op) {
if (op instanceof CreateTableOperation) {
final CreateTableOperation createTableOp = (CreateTableOperation) 
op;
final String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
try {
final long startupTimestampValue =
Long.parseLong(
createTableOp
.getCatalogTable()
.getOptions()

.get("scan.startup.timestamp-millis"));
if (startupTimestampValue < System.currentTimeMillis() - 
ONE_DAY) {
return Optional.of(
String.format(
"Validation failed in %s: 
'scan.startup.timestamp-millis' is too old. Given value: %d. It must be within 
one day from the current time.",
KafkaTableValidator.class.getName(),
startupTimestampValue));
}
} catch (NumberFormatException e) {
return Optional.empty();
}
}
}
return Optional.empty();
}
}{code}

I'm looking forward to get feedback from the community. 
Thanks

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override

[jira] [Updated] (FLINK-35560) Support custom query operation validator for sql gateway

2024-07-15 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Summary: Support custom query operation validator for sql gateway  (was: 
Add query validator support to flink sql gateway via spi pattern)

> Support custom query operation validator for sql gateway
> 
>
> Key: FLINK-35560
> URL: https://issues.apache.org/jira/browse/FLINK-35560
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Gateway
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
>
> h3. Summary
> Hello I'd like to suggest query validator support in flink sql gateway via 
> spi pattern.
> As an sql gateway operator, there is need for query validation to only 
> execute safe queries and drop unsafe queries. 
> To address this need, I propose adding a {{QueryValidator}} interface in 
> flink sql gateway api package. 
> This interface will allow users to implement their own query validation 
> logic, providing benefits to flink sql gateway operators.
> h3. Interface
> Below is a draft for the interface.
> It takes Operation and check whether the query is valid or not.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.annotation.Public;
> import org.apache.flink.table.operations.Operation;
> /**
>  * Interface for implementing a validator that checks the safety of executing 
> queries.
>  */
> @Public
> public interface QueryValidator {     
> boolean validateQuery(Operation op);
> }
> {code}
> h3. Example implementation
> Below is an example implementation that inspects Kafka table options, 
> specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
> is too small, which can cause high disk I/O load.
> {code:java}
> package org.apache.flink.table.gateway.api.validator;
> import org.apache.flink.table.gateway.api.validator.QueryValidator;
> import org.apache.flink.table.operations.Operation;
> import org.apache.flink.table.operations.ddl.CreateTableOperation;
> public class KafkaTimestampValidator implements QueryValidator {
> private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 
> @Override
> public boolean validateQuery(Operation op) {
> if (op instanceof CreateTableOperation) {
> CreateTableOperation createTableOp = (CreateTableOperation) op;
> String connector = 
> createTableOp.getCatalogTable().getOptions().get("connector");
> if ("kafka".equals(connector)) {
> String startupTimestamp = 
> createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
> if (startupTimestamp != null && 
> Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
> return false;
> }
> }
> }
> return true;
> }
> }{code}
> I'd be happy to implement this feature, if we can reach on agreement. 
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-30 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860998#comment-17860998
 ] 

dongwoo.kim edited comment on FLINK-35565 at 6/30/24 1:52 PM:
--

Since the stoppingOffset logic relies on the last record's offset, it can fall 
into an indefinite loop if all records are deleted. 
This [pr|https://github.com/apache/flink-connector-kafka/pull/100] can resolve 
the issue. Please take a look.


was (Author: JIRAUSER300481):
Since the stoppingOffset logic relies on the last record's offset, it can fall 
into an indefinite loop if all records are deleted. 
This [pr|https://github.com/apache/flink-connector-kafka/pull/100] resolves the 
issue. Please take a look.

> Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
> --
>
> Key: FLINK-35565
> URL: https://issues.apache.org/jira/browse/FLINK-35565
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
> Environment: This is reproduced on a *Flink 1.18.1* with the latest 
> Kafka connector 3.1.0-1.18 on a session cluster.
>Reporter: Naci Simsek
>Priority: Major
> Attachments: image-2024-06-11-11-19-09-889.png, 
> taskmanager_localhost_54489-ac092a_log.txt
>
>
> h2. Summary
> Flink batch job gets into an infinite fetch loop and could not gracefully 
> finish if the connected Kafka topic is empty and starting offset value in 
> Flink job is lower than the current start/end offset of the related topic. 
> See below for details:
> h2. How to reproduce
> Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
> from Kafka topic.
> Related Kafka topic is empty, there are no events, and the offset value is as 
> below: *15*
> !image-2024-06-11-11-19-09-889.png|width=895,height=256!
>  
> Flink job uses a *specific starting offset* value, which is +*less*+ than the 
> current offset of the topic/partition.
> See below, it set as “4”
>  
> {code:java}
> package naci.grpId;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.connector.kafka.source.KafkaSource;
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.kafka.common.TopicPartition;
> import java.util.HashMap;
> import java.util.Map;
> public class KafkaSource_Print {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> // Define the specific offsets for the partitions
> Map specificOffsets = new HashMap<>();
> specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // 
> Start from offset 4 for partition 0
> KafkaSource kafkaSource = KafkaSource
> .builder()
> .setBootstrapServers("localhost:9093")  // Make sure the port 
> is correct
> .setTopics("topic_test")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> 
> .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
> .setBounded(OffsetsInitializer.latest())
> .build();
> DataStream stream = env.fromSource(
> kafkaSource,
> WatermarkStrategy.noWatermarks(),
> "Kafka Source"
> );
> stream.print();
> env.execute("Flink KafkaSource test job");
> }
> }{code}
>  
>  
> Here are the initial logs printed related to the offset, as soon as the job 
> gets submitted:
>  
> {code:java}
> 2024-05-30 12:15:50,010 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]
> 2024-05-30 12:15:50,069 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]]
> 2024-05-30 12:15:50,074 TRACE 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> Seeking starting offsets to specified offsets: {topic_test-0=4}
> 2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>  [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
> groupId=null] Seeking to offset 4 for partition topic_test-0
> 

[jira] [Commented] (FLINK-35565) Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset

2024-06-30 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860998#comment-17860998
 ] 

dongwoo.kim commented on FLINK-35565:
-

Since the stoppingOffset logic relies on the last record's offset, it can fall 
into an indefinite loop if all records are deleted. 
This [pr|https://github.com/apache/flink-connector-kafka/pull/100] resolves the 
issue. Please take a look.

> Flink KafkaSource Batch Job Gets Into Infinite Loop after Resetting Offset
> --
>
> Key: FLINK-35565
> URL: https://issues.apache.org/jira/browse/FLINK-35565
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
> Environment: This is reproduced on a *Flink 1.18.1* with the latest 
> Kafka connector 3.1.0-1.18 on a session cluster.
>Reporter: Naci Simsek
>Priority: Major
> Attachments: image-2024-06-11-11-19-09-889.png, 
> taskmanager_localhost_54489-ac092a_log.txt
>
>
> h2. Summary
> Flink batch job gets into an infinite fetch loop and could not gracefully 
> finish if the connected Kafka topic is empty and starting offset value in 
> Flink job is lower than the current start/end offset of the related topic. 
> See below for details:
> h2. How to reproduce
> Flink +*batch*+ job which works as a {*}KafkaSource{*}, will consume events 
> from Kafka topic.
> Related Kafka topic is empty, there are no events, and the offset value is as 
> below: *15*
> !image-2024-06-11-11-19-09-889.png|width=895,height=256!
>  
> Flink job uses a *specific starting offset* value, which is +*less*+ than the 
> current offset of the topic/partition.
> See below, it set as “4”
>  
> {code:java}
> package naci.grpId;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.connector.kafka.source.KafkaSource;
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.kafka.common.TopicPartition;
> import java.util.HashMap;
> import java.util.Map;
> public class KafkaSource_Print {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> // Define the specific offsets for the partitions
> Map specificOffsets = new HashMap<>();
> specificOffsets.put(new TopicPartition("topic_test", 0), 4L); // 
> Start from offset 4 for partition 0
> KafkaSource kafkaSource = KafkaSource
> .builder()
> .setBootstrapServers("localhost:9093")  // Make sure the port 
> is correct
> .setTopics("topic_test")
> .setValueOnlyDeserializer(new SimpleStringSchema())
> 
> .setStartingOffsets(OffsetsInitializer.offsets(specificOffsets))
> .setBounded(OffsetsInitializer.latest())
> .build();
> DataStream stream = env.fromSource(
> kafkaSource,
> WatermarkStrategy.noWatermarks(),
> "Kafka Source"
> );
> stream.print();
> env.execute("Flink KafkaSource test job");
> }
> }{code}
>  
>  
> Here are the initial logs printed related to the offset, as soon as the job 
> gets submitted:
>  
> {code:java}
> 2024-05-30 12:15:50,010 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding 
> split(s) to reader: [[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]
> 2024-05-30 12:15:50,069 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Prepare to run AddSplitsTask: [[[Partition: topic_test-0, StartingOffset: 4, 
> StoppingOffset: 15]]]
> 2024-05-30 12:15:50,074 TRACE 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> Seeking starting offsets to specified offsets: {topic_test-0=4}
> 2024-05-30 12:15:50,074 INFO  org.apache.kafka.clients.consumer.KafkaConsumer 
>  [] - [Consumer clientId=KafkaSource--2381765882724812354-0, 
> groupId=null] Seeking to offset 4 for partition topic_test-0
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader [] - 
> SplitsChange handling result: [topic_test-0, start:4, stop: 15]
> 2024-05-30 12:15:50,075 DEBUG 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished running task AddSplitsTask: 

[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}


I'd be happy to implement this feature, if we can reach on agreement. 
Thanks.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-35560:
---

 Summary: Add query validator support to flink sql gateway via spi 
pattern
 Key: FLINK-35560
 URL: https://issues.apache.org/jira/browse/FLINK-35560
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Reporter: dongwoo.kim


h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-06-01 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17851305#comment-17851305
 ] 

dongwoo.kim commented on FLINK-34470:
-

Hello [~m.orazow] thanks for the review. I've added integration test code for 
this case. 
I'd be happy to get feedback from added test codes. 
Thanks.

Best,
Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-28 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841823#comment-17841823
 ] 

dongwoo.kim commented on FLINK-34470:
-

Hi [~m.orazow], 
I’ve just made a PR and would appreciate your review
I also left a comment about the metric issue in the discussion area and would 
appreciate any feedback on that.
Thanks

Best,
Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-14 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027
 ] 

dongwoo.kim edited comment on FLINK-34470 at 4/15/24 1:15 AM:
--

Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best,

Dongwoo


was (Author: JIRAUSER300481):
Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best regards,

Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-14 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837027#comment-17837027
 ] 

dongwoo.kim commented on FLINK-34470:
-

Hello [~m.orazow], 

I'm glad that you're interested in collaborating, I’ll send a pr soon and keep 
you updated.
Thanks for reaching out.

Best regards,

Dongwoo

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code, split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= 

[jira] [Comment Edited] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818500#comment-17818500
 ] 

dongwoo.kim edited comment on FLINK-34470 at 2/19/24 3:12 PM:
--

[~martijnvisser] I have used latest 
version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still 
exists. This 
[line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 seems to be causing the issue


was (Author: JIRAUSER300481):
[~martijnvisser] I have used latest 
version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still 
exists. This 
[line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 seems to be causing the issue

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix it we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= 

[jira] [Commented] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818500#comment-17818500
 ] 

dongwoo.kim commented on FLINK-34470:
-

[~martijnvisser] I have used latest 
version(flink-sql-connector-kafka-3.1.0-1.18.jar) and verified that issue still 
exists. This 
[line|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 seems to be causing the issue

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix it we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.


h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as count(*)
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these controll messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2.  How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be 

[jira] [Created] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-34470:
---

 Summary: Transactional message + Table api kafka source with 
'latest-offset' scan bound mode causes indefinitely hanging
 Key: FLINK-34470
 URL: https://issues.apache.org/jira/browse/FLINK-34470
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
Reporter: dongwoo.kim


h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.


h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as count(*)
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these controll messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33454) Adding tls configuration to IngressSpec

2023-12-13 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796210#comment-17796210
 ] 

dongwoo.kim commented on FLINK-33454:
-

[~ryanvanhuuksloot] sorry for the late reply, thanks for your efforts :)

> Adding tls configuration to IngressSpec
> ---
>
> Key: FLINK-33454
> URL: https://issues.apache.org/jira/browse/FLINK-33454
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Assignee: Ryan van Huuksloot
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Hello, I want to propose new configuration parameter for IngressSpec.
> Currently flink k8s operator creates ingress resource as we define but it 
> doesn't support tls configuration to secure ingress. 
> How about adding tls parameter on IngressSpec?
> *IngressSpec*
> tls: IngressTLS
> *IngressTLSSpec*
> Hosts: List
> SecretName: String 
> If we could reach an agreement I'll be glad to take on the implementation.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33715) Enhance history server to archive multiple histories per jobid

2023-11-30 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33715:

Description: 
Hello Flink team,

I'd like to propose an improvement to how the job manager archives job 
histories and how flink history server fetches the history. 

*Currently, only one job history per jobid is available to be archived and 
fectched.*  
When a flink job tries to archive the job's history more than once, usually 
'FileAlreadyExistsException' error happens.
This makes sense in most cases, since a job typically gets a new ID when it 
gets restarted from latest checkpoint/savepoint.

 

*_However, there's a specific situation where this behavior can be 
problematic:_*

*_1) When we upgrade a job using the savepoint mode, the job's first history 
gets successfully archived._*
*_2) If the same job later fails due to an error, its history isn't archived 
again because there's already a record with the same job ID._*

This can be an issue because the most valuable information – why the job failed 
– gets lost.

 

To simply solve this, I suggest to include currentTimeMillis to the history 
filename along with jobid. ( \{jobid}-\{currentTimeMillis} )
And also in the history fetching side parse jobid before the *"-"* delimiter 
and fetch all the histories for that jobid.
For UI we can keep current display or maybe enhance with adding extra hierarchy 
for each jobid since each jobid can now have multiple histories.

 

If we could reach an agreement I'll be glad to take on the implementation.
Thanks in advance.

  was:
Hello Flink team,

I'd like to propose an improvement to how the job manager archives job 
histories and how flink history server fetches the history. 
Currently, only one job history per jobid is available to be archived and 
fectched.  
When a flink job tries to archive the job's history more than once, usually 
'FileAlreadyExistsException' error happens.
This makes sense in most cases, since a job typically gets a new ID when it 
gets restarted from latest checkpoint/savepoint.

However, there's a specific situation where this behavior can be problematic:

1) When we upgrade a job using the savepoint mode, the job's first history gets 
successfully archived.
2) If the same job later fails due to an error, its history isn't archived 
again because there's already a record with the same job ID.

This can be an issue because the most valuable information – why the job failed 
– gets lost.

To simply solve this, I suggest to include currentTimeMillis to the history 
filename along with jobid. ( \{jobid}-\{currentTimeMillis} )
And also in the history fetching side parse jobid before the *"-"* delimiter 
and fetch all the histories for that jobid.
For UI we can keep current display or maybe enhance with adding extra hierarchy 
for each jobid since each jobid can now have multiple histories.

If we could reach an agreement I'll be glad to take on the implementation.
Thanks in advance.


> Enhance history server to archive multiple histories per jobid
> --
>
> Key: FLINK-33715
> URL: https://issues.apache.org/jira/browse/FLINK-33715
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: dongwoo.kim
>Priority: Minor
>
> Hello Flink team,
> I'd like to propose an improvement to how the job manager archives job 
> histories and how flink history server fetches the history. 
> *Currently, only one job history per jobid is available to be archived and 
> fectched.*  
> When a flink job tries to archive the job's history more than once, usually 
> 'FileAlreadyExistsException' error happens.
> This makes sense in most cases, since a job typically gets a new ID when it 
> gets restarted from latest checkpoint/savepoint.
>  
> *_However, there's a specific situation where this behavior can be 
> problematic:_*
> *_1) When we upgrade a job using the savepoint mode, the job's first history 
> gets successfully archived._*
> *_2) If the same job later fails due to an error, its history isn't archived 
> again because there's already a record with the same job ID._*
> This can be an issue because the most valuable information – why the job 
> failed – gets lost.
>  
> To simply solve this, I suggest to include currentTimeMillis to the history 
> filename along with jobid. ( \{jobid}-\{currentTimeMillis} )
> And also in the history fetching side parse jobid before the *"-"* delimiter 
> and fetch all the histories for that jobid.
> For UI we can keep current display or maybe enhance with adding extra 
> hierarchy for each jobid since each jobid can now have multiple histories.
>  
> If we could reach an agreement I'll be glad to take on the implementation.
> Thanks in advance.



--
This message was sent by Atlassian Jira

[jira] [Created] (FLINK-33715) Enhance history server to archive multiple histories per jobid

2023-11-30 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-33715:
---

 Summary: Enhance history server to archive multiple histories per 
jobid
 Key: FLINK-33715
 URL: https://issues.apache.org/jira/browse/FLINK-33715
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: dongwoo.kim


Hello Flink team,

I'd like to propose an improvement to how the job manager archives job 
histories and how flink history server fetches the history. 
Currently, only one job history per jobid is available to be archived and 
fectched.  
When a flink job tries to archive the job's history more than once, usually 
'FileAlreadyExistsException' error happens.
This makes sense in most cases, since a job typically gets a new ID when it 
gets restarted from latest checkpoint/savepoint.

However, there's a specific situation where this behavior can be problematic:

1) When we upgrade a job using the savepoint mode, the job's first history gets 
successfully archived.
2) If the same job later fails due to an error, its history isn't archived 
again because there's already a record with the same job ID.

This can be an issue because the most valuable information – why the job failed 
– gets lost.

To simply solve this, I suggest to include currentTimeMillis to the history 
filename along with jobid. ( \{jobid}-\{currentTimeMillis} )
And also in the history fetching side parse jobid before the *"-"* delimiter 
and fetch all the histories for that jobid.
For UI we can keep current display or maybe enhance with adding extra hierarchy 
for each jobid since each jobid can now have multiple histories.

If we could reach an agreement I'll be glad to take on the implementation.
Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33454) Adding tls configuration to IngressSpec

2023-11-03 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33454:

Description: 
Hello, I want to propose new configuration parameter for IngressSpec.

Currently flink k8s operator creates ingress resource as we define but it 
doesn't support tls configuration to secure ingress. 

How about adding tls parameter on IngressSpec?

*IngressSpec*
tls: IngressTLS

*IngressTLSSpec*
Hosts: List
SecretName: String 

If we could reach an agreement I'll be glad to take on the implementation.
Thanks in advance.

  was:
Hello, I want to propose new configuration parameter for IngressSpec.



Currently flink k8s operator creates ingress resource as we define but it 
doesn't support tls configuration to secure ingress. 

How about adding tls parameter on IngressSpec?

*IngressSpec*
tls: IngressTLS

*IngressTLSSpec*
Hosts: List
SecretName: String 

If we could reach to agreement I'll be glad to take on the implementation.
Thanks in advance.


> Adding tls configuration to IngressSpec
> ---
>
> Key: FLINK-33454
> URL: https://issues.apache.org/jira/browse/FLINK-33454
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Priority: Minor
>
> Hello, I want to propose new configuration parameter for IngressSpec.
> Currently flink k8s operator creates ingress resource as we define but it 
> doesn't support tls configuration to secure ingress. 
> How about adding tls parameter on IngressSpec?
> *IngressSpec*
> tls: IngressTLS
> *IngressTLSSpec*
> Hosts: List
> SecretName: String 
> If we could reach an agreement I'll be glad to take on the implementation.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33454) Adding tls configuration to IngressSpec

2023-11-03 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-33454:
---

 Summary: Adding tls configuration to IngressSpec
 Key: FLINK-33454
 URL: https://issues.apache.org/jira/browse/FLINK-33454
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: dongwoo.kim


Hello, I want to propose new configuration parameter for IngressSpec.



Currently flink k8s operator creates ingress resource as we define but it 
doesn't support tls configuration to secure ingress. 

How about adding tls parameter on IngressSpec?

*IngressSpec*
tls: IngressTLS

*IngressTLSSpec*
Hosts: List
SecretName: String 

If we could reach to agreement I'll be glad to take on the implementation.
Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-27 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780439#comment-17780439
 ] 

dongwoo.kim edited comment on FLINK-33324 at 10/27/23 3:48 PM:
---

Thanks for the opnion [~roman] , and sorry for the late reply. 
I've been thinking about ways to detect the slow restore issue in Flink, and 
here are a couple of methods I've considered

*1. Alerts Based on Checkpoint Failures.* 
Since flink tries to trigger checkpoint even though all the subtasks are not in 
running state it always fails.
While this indicates a possible issue, it doesn't specifically tell us if it's 
due to slow recovery

*2. Using the REST API*
Another approach could be to leveraging Flink's REST API to check the status of 
subtasks. I found that even if some subtasks are initializing state the 
*"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was 
unexpected. 
However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* 
endpoint to identify subtasks that are in the *initializing* state. 
A limitation here is that this endpoint just shows the current state and 
doesn't provide the duration for which a subtask has been in that state. 
So, some additional logic might be needed to track how long subtasks have been 
'stuck'.

Considering the above, if we're looking to enhance Flink's observability, what 
about introducing a metric that shows restore time? 
Perhaps something named {*}lastDurationOfRestore{*}? WDYT?

Thanks.


was (Author: JIRAUSER300481):
Thanks for the opnion [~roman] , and sorry for the late reply. 
I've been thinking about ways to detect the slow restore issue in Flink, and 
here are a couple of methods I've considered:

*1. Alerts Based on Checkpoint Failures.* 
Since flink tries to trigger checkpoint even though all the subtasks are not in 
running state it always fails.
While this indicates a possible issue, it doesn't specifically tell us if it's 
due to slow recovery

*2. Using the REST API*
Another approach could be to leveraging Flink's REST API to check the status of 
subtasks. I found that even if some subtasks are initializing state the 
*"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was 
unexpected. 
However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* 
endpoint to identify subtasks that are in the *initializing* state. 
A limitation here is that this endpoint just shows the current state and 
doesn't provide the duration for which a subtask has been in that state. 
So, some additional logic might be needed to track how long subtasks have been 
'stuck'.

Considering the above, if we're looking to enhance Flink's observability, what 
about introducing a metric that shows restore time? 
Perhaps something named {*}lastDurationOfRestore{*}? WDYT?

Thanks.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state 

[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-27 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780439#comment-17780439
 ] 

dongwoo.kim commented on FLINK-33324:
-

Thanks for the opnion [~roman] , and sorry for the late reply. 
I've been thinking about ways to detect the slow restore issue in Flink, and 
here are a couple of methods I've considered:

*1. Alerts Based on Checkpoint Failures.* 
Since flink tries to trigger checkpoint even though all the subtasks are not in 
running state it always fails.
While this indicates a possible issue, it doesn't specifically tell us if it's 
due to slow recovery

*2. Using the REST API*
Another approach could be to leveraging Flink's REST API to check the status of 
subtasks. I found that even if some subtasks are initializing state the 
*"/jobs/:jobid/status"* endpoint often shows them as {*}running{*}. This was 
unexpected. 
However, we can look at the *"/jobs/:jobid/vertices/:vertexid/taskmanagers"* 
endpoint to identify subtasks that are in the *initializing* state. 
A limitation here is that this endpoint just shows the current state and 
doesn't provide the duration for which a subtask has been in that state. 
So, some additional logic might be needed to track how long subtasks have been 
'stuck'.

Considering the above, if we're looking to enhance Flink's observability, what 
about introducing a metric that shows restore time? 
Perhaps something named {*}lastDurationOfRestore{*}? WDYT?

Thanks.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> (I've simulated network chaos, using 
> [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
>  chaos engineering tool.)
> !image-2023-10-20-17-42-11-504.png|width=940,height=317!
>  
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755
 ] 

dongwoo.kim edited comment on FLINK-33324 at 10/20/23 1:25 PM:
---

Hi, [~pnowojski] 

Thanks for the opinion. 
First about the code, I just simply wrapped the main logic code 
[here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94]
 with callable object and combined with future.get(timeout). Please consider 
that it was just initial check for feasibility without a deep dive into the 
Flink code.

When considering manual action from human, I agree solving this issue with 
alert system seem practical. 
However, our goal for handling the failover loop was to automate operations 
using the failure-rate restart strategy and a cronJob that monitors the Flink 
job's status. 
Instead of adding ambiguous conditions in the cronJob, treating an unusually 
long restore operation as a failure simplifies our process. 
Yet, I understand from the feedback that this approach might fit more to our 
team's unique needs and might not be as helpful for everyone else.


was (Author: JIRAUSER300481):
Hi, [~pnowojski] 

Thanks for the opinion. 
First about the code, I just simply wrapped the main logic code 
[here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94]
 with callable object and combined with future.get(timeout). Please consider 
that it was just initial check for feasibility without a deep dive into the 
Flink code.

When considering manual action from human, I agree solving this issue with 
alert system seem practical. 
However, our goal for handling the failover loop was to automate operations 
using the failure-rate restart strategy and a cronJob that monitors the Flink 
job's status. 
Instead of adding complex conditions in the cronJob, treating an unusually long 
restore operation as a failure simplifies our process. 
Yet, I understand from the feedback that this approach might fit more to our 
team's unique needs and might not be as helpful for everyone else.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> 

[jira] [Comment Edited] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755
 ] 

dongwoo.kim edited comment on FLINK-33324 at 10/20/23 1:24 PM:
---

Hi, [~pnowojski] 

Thanks for the opinion. 
First about the code, I just simply wrapped the main logic code 
[here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94]
 with callable object and combined with future.get(timeout). Please consider 
that it was just initial check for feasibility without a deep dive into the 
Flink code.

When considering manual action from human, I agree solving this issue with 
alert system seem practical. 
However, our goal for handling the failover loop was to automate operations 
using the failure-rate restart strategy and a cronJob that monitors the Flink 
job's status. 
Instead of adding complex conditions in the cronJob, treating an unusually long 
restore operation as a failure simplifies our process. 
Yet, I understand from the feedback that this approach might fit more to our 
team's unique needs and might not be as helpful for everyone else.


was (Author: JIRAUSER300481):
Hi, [~pnowojski] 



Thanks for the opinion. 
First about the code, I just simply wrapped the main logic code 
[here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94]
 with callable object and combined with future.get(timeout). Please consider 
that it was just initial check for feasibility without a deep dive into the 
Flink code.

When considering manual action from human, solving this issue with alert system 
seem practical. 
However, our goal for handling the failover loop was to automate operations 
using the failure-rate restart strategy and a cronJob that monitors the Flink 
job's status. 
Instead of adding complex conditions in the cronJob, treating an unusually long 
restore operation as a failure simplifies our process. 
Yet, I understand from the feedback that this approach might fit more to our 
team's unique needs and might not be as helpful for everyone else.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> 

[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1757#comment-1757
 ] 

dongwoo.kim commented on FLINK-33324:
-

Hi, [~roman] 

Thanks for considering my suggestion.
For some background, we are leveraging failure-rate restart strategy and 
monitoring cronJob to manage flink application.


By marking long hanging restore operation as a failure retry can be initiated.

And if the retries don't resolve the issue, the job should ultimately fail.
This way, the cronJob monitoring the Flink application can quickly detect it 
and redeploy the job from its last state.

(In this scenario new task manager pod is created so this specific issue could 
be solved)

 

As you pointed out, introducing a timeout might have various side effects. 
Whether one can tolerate a lengthy restore or prefers quicker retries and 
redeployments might vary based on operational needs. 
What if we make this an optional feature? By default, there would be no 
timeout, but developers could configure it if desired.

Thanks in advance.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> (I've simulated network chaos, using 
> [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
>  chaos engineering tool.)
> !image-2023-10-20-17-42-11-504.png|width=940,height=317!
>  
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1756#comment-1756
 ] 

dongwoo.kim commented on FLINK-33324:
-

Hi, [~Zakelly] 



Thanks for sharing your opinion and experience.
Unfortunately we failed to discover the root cause. 
However, we noticed that during the problem, our hdfs wasn't under any unusual 
load, in fact, it was below its average. 
Additionally, our state size isn't particularly large.
All other tm completed the restore process, except one that got stuck for over 
an hour.
After removing that specific tm, everything went back to normal. 

Your feedback made me thinking enforcing timeout for restore operation might 
have hugh side effect. 
Perhaps I was too focused on our specific situation. 

What if we keep the default setting without a timeout, but allow developers the 
option to set one if needed? 
Depending on their specific needs and SLO, long hanging restore could be 
considered as failure. WDYT?

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> (I've simulated network chaos, using 
> [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
>  chaos engineering tool.)
> !image-2023-10-20-17-42-11-504.png|width=940,height=317!
>  
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1755#comment-1755
 ] 

dongwoo.kim commented on FLINK-33324:
-

Hi, [~pnowojski] 



Thanks for the opinion. 
First about the code, I just simply wrapped the main logic code 
[here|https://github.com/apache/flink/blob/72e302310ba55bb5f35966ed448243aae36e193e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java#L94]
 with callable object and combined with future.get(timeout). Please consider 
that it was just initial check for feasibility without a deep dive into the 
Flink code.

When considering manual action from human, solving this issue with alert system 
seem practical. 
However, our goal for handling the failover loop was to automate operations 
using the failure-rate restart strategy and a cronJob that monitors the Flink 
job's status. 
Instead of adding complex conditions in the cronJob, treating an unusually long 
restore operation as a failure simplifies our process. 
Yet, I understand from the feedback that this approach might fit more to our 
team's unique needs and might not be as helpful for everyone else.

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=683,height=604!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. 
>      As a result, the job remained in the initialization phase for very long 
> period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works as 
> expected. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> (I've simulated network chaos, using 
> [litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
>  chaos engineering tool.)
> !image-2023-10-20-17-42-11-504.png|width=940,height=317!
>  
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33324:

Description: 
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=683,height=604!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. 
     As a result, the job remained in the initialization phase for very long 
period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works as 
expected. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
(I've simulated network chaos, using 
[litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
 chaos engineering tool.)
!image-2023-10-20-17-42-11-504.png|width=940,height=317!

 

Thank you for considering my proposal. I'm looking forward to hear your 
thoughts. 
If there's agreement on this, I'd be happy to work on implementing this feature.

  was:
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=565,height=500!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works as 
expected. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
(I've simulated network chaos, using 
[litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
 chaos 

[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33324:

Description: 
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=565,height=500!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works as 
expected. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
(I've simulated network chaos, using 
[litmus|https://litmuschaos.github.io/litmus/experiments/categories/pods/pod-network-latency/#destination-ips-and-destination-hosts]
 chaos engineering tool.)
!image-2023-10-20-17-42-11-504.png|width=1121,height=378!

 

Thank you for considering my proposal. I'm looking forward to hear your 
thoughts. 
If there's agreement on this, I'd be happy to work on implementing this feature.

  was:
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=565,height=500!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
I've simulated network chaos, using litmus chaos engineering tool.
!image-2023-10-20-17-30-10-751.png|width=839,height=283!

Thank you for considering my proposal. I'm looking forward to hear 

[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33324:

Description: 
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=565,height=500!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
I've simulated network chaos, using litmus chaos engineering tool.
!image-2023-10-20-17-30-10-751.png|width=839,height=283!

Thank you for considering my proposal. I'm looking forward to hear your 
thoughts. 
If there's agreement on this, I'd be happy to work on implementing this feature.

  was:
Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=847,height=749!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
I've simulated network chaos, using litmus chaos engineering tool.
!image-2023-10-20-17-30-10-751.png|width=839,height=283!

Thank you for considering my proposal. I'm looking forward to hear your 
thoughts. 
If there's agreement on this, I'd be happy to work on implementing this feature.


> Add flink managed timeout mechanism for backend 

[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33324:

Attachment: (was: image-2023-10-20-17-30-02-290.png)

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=565,height=500!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
> result, the job remained in the initialization phase for very long period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> I've simulated network chaos, using litmus chaos engineering tool.
> !image-2023-10-20-17-30-10-751.png|width=839,height=283!
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33324:

Attachment: image-2023-10-20-17-42-11-504.png

> Add flink managed timeout mechanism for backend restore operation
> -
>
> Key: FLINK-33324
> URL: https://issues.apache.org/jira/browse/FLINK-33324
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: dongwoo.kim
>Priority: Minor
> Attachments: image-2023-10-20-15-16-53-324.png, 
> image-2023-10-20-17-42-11-504.png
>
>
> Hello community, I would like to share an issue our team recently faced and 
> propose a feature to mitigate similar problems in the future.
> h2. Issue
> Our Flink streaming job encountered consecutive checkpoint failures and 
> subsequently attempted a restart. 
> This failure occurred due to timeouts in two subtasks located within the same 
> task manager. 
> The restore operation for this particular task manager also got stuck, 
> resulting in an "initializing" state lasting over an hour. 
> Once we realized the hang during the restore operation, we terminated the 
> task manager pod, resolving the issue.
> !image-2023-10-20-15-16-53-324.png|width=565,height=500!
> The sequence of events was as follows:
> 1. Checkpoint timed out for subtasks within the task manager, referred to as 
> tm-32.
> 2. The Flink job failed and initiated a restart.
> 3. Restoration was successful for 282 subtasks, but got stuck for the 2 
> subtasks in tm-32.
> 4. While the Flink tasks weren't fully in running state, checkpointing was 
> still being triggered, leading to consecutive checkpoint failures.
> 5. These checkpoint failures seemed to be ignored, and did not count to the 
> execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
> result, the job remained in the initialization phase for very long period.
> 6. Once we found this, we terminated the tm-32 pod, leading to a successful 
> Flink job restart.
> h2. Suggestion
> I feel that, a Flink job remaining in the initializing state indefinitely is 
> not ideal. 
> To enhance resilience, I think it would be helpful if we could add timeout 
> feature for restore operation. 
> If the restore operation exceeds a specified duration, an exception should be 
> thrown, causing the job to fail. 
> This way, we can address restore-related issues similarly to how we handle 
> checkpoint failures.
> h2. Notes
> Just to add, I've made a basic version of this feature to see if it works. 
> I've attached a picture from the Flink UI that shows the timeout exception 
> happened during restore operation. 
> It's just a start, but I hope it helps with our discussion. 
> I've simulated network chaos, using litmus chaos engineering tool.
> !image-2023-10-20-17-30-10-751.png|width=839,height=283!
> Thank you for considering my proposal. I'm looking forward to hear your 
> thoughts. 
> If there's agreement on this, I'd be happy to work on implementing this 
> feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33324) Add flink managed timeout mechanism for backend restore operation

2023-10-20 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-33324:
---

 Summary: Add flink managed timeout mechanism for backend restore 
operation
 Key: FLINK-33324
 URL: https://issues.apache.org/jira/browse/FLINK-33324
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: dongwoo.kim
 Attachments: image-2023-10-20-15-16-53-324.png, 
image-2023-10-20-17-30-02-290.png

Hello community, I would like to share an issue our team recently faced and 
propose a feature to mitigate similar problems in the future.
h2. Issue

Our Flink streaming job encountered consecutive checkpoint failures and 
subsequently attempted a restart. 
This failure occurred due to timeouts in two subtasks located within the same 
task manager. 
The restore operation for this particular task manager also got stuck, 
resulting in an "initializing" state lasting over an hour. 
Once we realized the hang during the restore operation, we terminated the task 
manager pod, resolving the issue.

!image-2023-10-20-15-16-53-324.png|width=847,height=749!

The sequence of events was as follows:

1. Checkpoint timed out for subtasks within the task manager, referred to as 
tm-32.
2. The Flink job failed and initiated a restart.
3. Restoration was successful for 282 subtasks, but got stuck for the 2 
subtasks in tm-32.
4. While the Flink tasks weren't fully in running state, checkpointing was 
still being triggered, leading to consecutive checkpoint failures.
5. These checkpoint failures seemed to be ignored, and did not count to the 
execution.checkpointing.tolerable-failed-checkpoints configuration. As a 
result, the job remained in the initialization phase for very long period.
6. Once we found this, we terminated the tm-32 pod, leading to a successful 
Flink job restart.
h2. Suggestion

I feel that, a Flink job remaining in the initializing state indefinitely is 
not ideal. 
To enhance resilience, I think it would be helpful if we could add timeout 
feature for restore operation. 
If the restore operation exceeds a specified duration, an exception should be 
thrown, causing the job to fail. 
This way, we can address restore-related issues similarly to how we handle 
checkpoint failures.
h2. Notes

Just to add, I've made a basic version of this feature to see if it works. 
I've attached a picture from the Flink UI that shows the timeout exception 
happened during restore operation. 
It's just a start, but I hope it helps with our discussion. 
I've simulated network chaos, using litmus chaos engineering tool.
!image-2023-10-20-17-30-10-751.png|width=839,height=283!

Thank you for considering my proposal. I'm looking forward to hear your 
thoughts. 
If there's agreement on this, I'd be happy to work on implementing this feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod

2023-09-12 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764455#comment-17764455
 ] 

dongwoo.kim commented on FLINK-33066:
-

Thanks [~gyfora], I have opened the pr :)

> Enable to inject environment variable from secret/configmap to operatorPod
> --
>
> Key: FLINK-33066
> URL: https://issues.apache.org/jira/browse/FLINK-33066
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Assignee: dongwoo.kim
>Priority: Minor
>  Labels: pull-request-available
>
> Hello, I've been working with the Flink Kubernetes operator and noticed that 
> the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
> support Kubernetes {{valueFrom}} syntax.
> How about changing template to support more various k8s syntax? 
> *Current template*
> {code:java}
> {{- range $k, $v := .Values.operatorPod.env }}
>   - name: {{ $v.name | quote }}
>     value: {{ $v.value | quote }}
> {{- end }}{code}
>  
> *Proposed template*
> 1) Modify template like below 
> {code:java}
> {{- with .Values.operatorPod.env }} 
> {{- toYaml . | nindent 12 }} 
> {{- end }} 
> {code}
> 2) create extra config, *Values.operatorPod.envFrom* and utilize this
>  
> I'd be happy to implement this update if it's approved.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod

2023-09-08 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33066:

Description: 
Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) Modify template like below 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

I'd be happy to implement this update if it's approved.
Thanks in advance.

  was:
Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

I'd be happy to implement this update if it's approved.
Thanks in advance.


> Enable to inject environment variable from secret/configmap to operatorPod
> --
>
> Key: FLINK-33066
> URL: https://issues.apache.org/jira/browse/FLINK-33066
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Priority: Minor
>
> Hello, I've been working with the Flink Kubernetes operator and noticed that 
> the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
> support Kubernetes {{valueFrom}} syntax.
> How about changing template to support more various k8s syntax? 
> *Current template*
> {code:java}
> {{- range $k, $v := .Values.operatorPod.env }}
>   - name: {{ $v.name | quote }}
>     value: {{ $v.value | quote }}
> {{- end }}{code}
>  
> *Proposed template*
> 1) Modify template like below 
> {code:java}
> {{- with .Values.operatorPod.env }} 
> {{- toYaml . | nindent 12 }} 
> {{- end }} 
> {code}
> 2) create extra config, *Values.operatorPod.envFrom* and utilize this
>  
> I'd be happy to implement this update if it's approved.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod

2023-09-08 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-33066:
---

 Summary: Enable to inject environment variable from 
secret/configmap to operatorPod
 Key: FLINK-33066
 URL: https://issues.apache.org/jira/browse/FLINK-33066
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: dongwoo.kim


Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

 

I'd be happy to implement this update if it's approved.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33066) Enable to inject environment variable from secret/configmap to operatorPod

2023-09-08 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-33066:

Description: 
Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

I'd be happy to implement this update if it's approved.
Thanks in advance.

  was:
Hello, I've been working with the Flink Kubernetes operator and noticed that 
the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
support Kubernetes {{valueFrom}} syntax.

How about changing template to support more various k8s syntax? 

*Current template*
{code:java}
{{- range $k, $v := .Values.operatorPod.env }}
  - name: {{ $v.name | quote }}
    value: {{ $v.value | quote }}
{{- end }}{code}
 

*Proposed template*
1) 
{code:java}
{{- with .Values.operatorPod.env }} 
{{- toYaml . | nindent 12 }} 
{{- end }} 
{code}
2) create extra config, *Values.operatorPod.envFrom* and utilize this

 

 

I'd be happy to implement this update if it's approved.


> Enable to inject environment variable from secret/configmap to operatorPod
> --
>
> Key: FLINK-33066
> URL: https://issues.apache.org/jira/browse/FLINK-33066
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: dongwoo.kim
>Priority: Minor
>
> Hello, I've been working with the Flink Kubernetes operator and noticed that 
> the {{operatorPod.env}} only allows for simple key-value pairs and doesn't 
> support Kubernetes {{valueFrom}} syntax.
> How about changing template to support more various k8s syntax? 
> *Current template*
> {code:java}
> {{- range $k, $v := .Values.operatorPod.env }}
>   - name: {{ $v.name | quote }}
>     value: {{ $v.value | quote }}
> {{- end }}{code}
>  
> *Proposed template*
> 1) 
> {code:java}
> {{- with .Values.operatorPod.env }} 
> {{- toYaml . | nindent 12 }} 
> {{- end }} 
> {code}
> 2) create extra config, *Values.operatorPod.envFrom* and utilize this
>  
> I'd be happy to implement this update if it's approved.
> Thanks in advance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-24 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758611#comment-17758611
 ] 

dongwoo.kim edited comment on FLINK-32760 at 8/24/23 2:17 PM:
--

Hi [~luoyuxia] , I have opened backport PRs. 
Please check in your free time. 
1.16: [https://github.com/apache/flink/pull/23286]
1.17: [https://github.com/apache/flink/pull/23287]
1.18: [https://github.com/apache/flink/pull/23288] 


was (Author: JIRAUSER300481):
Hi [~luoyuxia] , I have opened backport PRs. 
Please check in your free time. 
[https://github.com/apache/flink/pull/23286]
[https://github.com/apache/flink/pull/23287]
[https://github.com/apache/flink/pull/23288] 

> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-24 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758611#comment-17758611
 ] 

dongwoo.kim commented on FLINK-32760:
-

Hi [~luoyuxia] , I have opened backport PRs. 
Please check in your free time. 
[https://github.com/apache/flink/pull/23286]
[https://github.com/apache/flink/pull/23287]
[https://github.com/apache/flink/pull/23288] 

> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-09 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17752577#comment-17752577
 ] 

dongwoo.kim commented on FLINK-32760:
-

Hi [~luoyuxia], I have opened the pr. 
I'd greatly appreciate it if you could review the pr when you have some free 
time. Thank you!

> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-07 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751715#comment-17751715
 ] 

dongwoo.kim commented on FLINK-32760:
-

Thanks for your help. I'll soon share the pr

> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-06 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32760:

Affects Version/s: 1.17.1

> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-05 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32760:

Description: 
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
also *shaded.parquet* prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}

{color:#172b4d}So we need to shade both.{color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that adding this rule could resolve the above error.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 

  was:
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
also *shaded.parquet* prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}

{color:#172b4d}So we need to shade both.{color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 


> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: dongwoo.kim
>Priority: Major
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that adding this rule could resolve the above error.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy 

[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-05 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32760:

Description: 
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
also *shaded.parquet* prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}

{color:#172b4d}So we need to shade both.{color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 

  was:
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes org.apache.parquet but also 
shaded.parquet prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 


> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: dongwoo.kim
>Priority: Major
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes *org.apache.parquet* but 
> also *shaded.parquet* prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}So we need to shade both.{color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that if we add below rule, the above error is resolved.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. 

[jira] [Updated] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-05 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32760:

Description: 
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes org.apache.parquet but also 
shaded.parquet prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 

  was:
h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes org.apache.parquet but also 
shaded.parquet prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]
 {color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 


> Version Conflict in flink-sql-connector-hive for shaded.parquet prefix 
> packages
> ---
>
> Key: FLINK-32760
> URL: https://issues.apache.org/jira/browse/FLINK-32760
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: dongwoo.kim
>Priority: Major
> Attachments: image-2023-08-05-14-50-47-806.png
>
>
> h2. Summary
> In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
> parquet dependency from *hive-exec* is done. 
> But I think this is not enough and causing errors like below when I try to 
> read parquet file using sql-gateway which requires both *flink-parquet* and 
> *flink-sql-connector-hive* dependencies.
> !image-2023-08-05-14-50-47-806.png|width=1392,height=909!
>  
> h2. {color:#172b4d}Cause{color}
> {color:#172b4d}Parquet dependency not only includes org.apache.parquet but 
> also shaded.parquet prefix dependencies. 
> ([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]){color}
> {color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
> 0.16.0 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
> Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}
> {color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
> 0.9.3, causing the error.{color}
> h2. {color:#172b4d}Proposed solution{color}
> Adding new shading rule to flink-sql-connector-hive project.
> I have confirmed that if we add below rule, the above error is resolved.
> {code:xml}
> 
>  shaded.parquet
>  shaded.parquet.flink.hive.shaded
> {code}
>  
> I would be happy to implement it if the proposal is accepted. Thanks
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32760) Version Conflict in flink-sql-connector-hive for shaded.parquet prefix packages

2023-08-05 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-32760:
---

 Summary: Version Conflict in flink-sql-connector-hive for 
shaded.parquet prefix packages
 Key: FLINK-32760
 URL: https://issues.apache.org/jira/browse/FLINK-32760
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: dongwoo.kim
 Attachments: image-2023-08-05-14-50-47-806.png

h2. Summary

In https://issues.apache.org/jira/browse/FLINK-23074 it seems like shading 
parquet dependency from *hive-exec* is done. 

But I think this is not enough and causing errors like below when I try to read 
parquet file using sql-gateway which requires both *flink-parquet* and 
*flink-sql-connector-hive* dependencies.

!image-2023-08-05-14-50-47-806.png|width=1392,height=909!

 
h2. {color:#172b4d}Cause{color}

{color:#172b4d}Parquet dependency not only includes org.apache.parquet but also 
shaded.parquet prefix dependencies. 
([ref|https://github.com/apache/parquet-mr/blob/515734c373f69b5250e8b63eb3d1c973da893b63/pom.xml#L72]
 {color}

{color:#172b4d}- flink-parquet depends on Parquet 1.12.3 with shaded Thrift 
0.16.0 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- flink-sql-connector-hive depends on hive-exec 3.1.3 with 
Parquet 1.10.0 and shaded Thrift 0.9.3 (prefix: {{{}shaded.parquet{}}}){color}

{color:#172b4d}- Code compiled against Thrift 0.16.0 attempts to run against 
0.9.3, causing the error.{color}
h2. {color:#172b4d}Proposed solution{color}

Adding new shading rule to flink-sql-connector-hive project.

I have confirmed that if we add below rule, the above error is resolved.
{code:xml}

 shaded.parquet
 shaded.parquet.flink.hive.shaded
{code}
 

I would be happy to implement it if the proposal is accepted. Thanks

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717
 ] 

dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:22 PM:
--

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version configured to 1.16.1


was (Author: JIRAUSER300481):
I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version to 1.16.1

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717
 ] 

dongwoo.kim edited comment on FLINK-32408 at 6/21/23 1:21 PM:
--

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version, since they had flink version to 1.16.1


was (Author: JIRAUSER300481):
I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version.

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735717#comment-17735717
 ] 

dongwoo.kim commented on FLINK-32408:
-

I think this is fixed in new version because in main branch's pom.xml, flink 
version is updated from *1.16.1* to *1.17.1.*
But then I'm curious whether we can say that kubernetes-operator-1.5.0 supports 
flink 1.17 version.

> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32408:

Description: 
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't, only *high-availability* works

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 

  was:
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't{*}.{*}

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 


> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't, only *high-availability* works
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-32408:

Description: 
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work but it didn't{*}.{*}

*ref*
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 

  was:
In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work ** but it didn't{*}.{*}


ref: 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 


> JobManager HA configuration update needed in Flink k8s Operator 
> 
>
> Key: FLINK-32408
> URL: https://issues.apache.org/jira/browse/FLINK-32408
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.5.0
>Reporter: dongwoo.kim
>Priority: Minor
> Fix For: kubernetes-operator-1.6.0
>
>
> In flink 1.17 documentation it says, to configure job manger ha we have to 
> configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
> seems to be changed from 1.17)
> And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
> So I expected that configuring job manager ha with *high-availability.type* 
> should work but it didn't{*}.{*}
> *ref*
> [https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
>  
> [https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32408) JobManager HA configuration update needed in Flink k8s Operator

2023-06-21 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-32408:
---

 Summary: JobManager HA configuration update needed in Flink k8s 
Operator 
 Key: FLINK-32408
 URL: https://issues.apache.org/jira/browse/FLINK-32408
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: dongwoo.kim
 Fix For: kubernetes-operator-1.6.0


In flink 1.17 documentation it says, to configure job manger ha we have to 
configure *high-availability.type* key not *high-availability* key{*}.{*} (It 
seems to be changed from 1.17)

And currently kubernetes-operator-1.5.0 says it supports flink 1.17 version. 
So I expected that configuring job manager ha with *high-availability.type* 
should work ** but it didn't{*}.{*}


ref: 
[https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#high-availability]
 
[https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/concepts/overview/#core]
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)