[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner via init Context
[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-35384: --- Summary: Expose TaskIOMetricGroup to custom Partitioner via init Context (was: Expose TaskIOMetricGroup to custom Partitioner) > Expose TaskIOMetricGroup to custom Partitioner via init Context > --- > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.4 >Reporter: Steven Zhen Wu >Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public DataStream partitionCustom( > Partitioner partitioner, KeySelector keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. > `Partitioner` interface is a functional interface today. we can add a new > default `setup` method without breaking the backward compatibility. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > *default void setup(TaskIOMetricGroup metrics) {}* > int partition(K key, int numPartitions); > } > {code} > I know public interface requires a FLIP process. will do that if the > community agree with this feature request. > Personally, `numPartitions` should be passed in the `setup` method too. But > it is a breaking change that is NOT worth the benefit right now. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > public void setup(int numPartitions, TaskIOMetricGroup metrics) {} > int partition(K key); > } > {code} > That would be similar to `StreamPartitioner#setup()` method that we would > need to modify for passing the metrics group. > {code} > @Internal > public abstract class StreamPartitioner > implements ChannelSelector>>, > Serializable { > @Override > public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { > this.numberOfChannels = numberOfChannels; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner
[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847116#comment-17847116 ] Steven Zhen Wu edited comment on FLINK-35384 at 5/17/24 12:06 AM: -- one potential risk of this type of API is that it is not extensible. if we want to pass in another arg to partitioner, we need to break the compatibility or add a new method. {code} default void setup(TaskIOMetricGroup metrics) {} {code} Maybe we can move to the context model that is widely used in Flink {code} @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { int partition(K key, int numPartitions); void init(Context context) {} interface Context { int numberOfChannels(); TaskIOMetricGroup metrics(); } } {code} was (Author: stevenz3wu): one potential risk of this type of API is that it is not extensible. if we want to pass in another arg to partitioner, we need to break the compatibility or add a new method. {code} default void setup(TaskIOMetricGroup metrics) {} {code} Maybe we can move to the context model that is widely used in Flink {code} @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { int partition(K key, int numPartitions); void init(Context context) {} interface Context { int numberOfChannels(); TaskIOMetricGroup metrics(); } } {code} > Expose TaskIOMetricGroup to custom Partitioner > -- > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.4 >Reporter: Steven Zhen Wu >Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public DataStream partitionCustom( > Partitioner partitioner, KeySelector keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. > `Partitioner` interface is a functional interface today. we can add a new > default `setup` method without breaking the backward compatibility. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > *default void setup(TaskIOMetricGroup metrics) {}* > int partition(K key, int numPartitions); > } > {code} > I know public interface requires a FLIP process. will do that if the > community agree with this feature request. > Personally, `numPartitions` should be passed in the `setup` method too. But > it is a breaking change that is NOT worth the benefit right now. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > public void setup(int numPartitions, TaskIOMetricGroup metrics) {} > int partition(K key); > } > {code} > That would be similar to `StreamPartitioner#setup()` method that we would > need to modify for passing the metrics group. > {code} > @Internal > public abstract class StreamPartitioner > implements ChannelSelector>>, > Serializable { > @Override > public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { > this.numberOfChannels = numberOfChannels; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner
[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847116#comment-17847116 ] Steven Zhen Wu commented on FLINK-35384: one potential risk of this type of API is that it is not extensible. if we want to pass in another arg to partitioner, we need to break the compatibility or add a new method. {code} default void setup(TaskIOMetricGroup metrics) {} {code} Maybe we can move to the context model that is widely used in Flink {code} @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { int partition(K key, int numPartitions); void init(Context context) {} interface Context { int numberOfChannels(); TaskIOMetricGroup metrics(); } } {code} > Expose TaskIOMetricGroup to custom Partitioner > -- > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.4 >Reporter: Steven Zhen Wu >Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public DataStream partitionCustom( > Partitioner partitioner, KeySelector keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. > `Partitioner` interface is a functional interface today. we can add a new > default `setup` method without breaking the backward compatibility. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > *default void setup(TaskIOMetricGroup metrics) {}* > int partition(K key, int numPartitions); > } > {code} > I know public interface requires a FLIP process. will do that if the > community agree with this feature request. > Personally, `numPartitions` should be passed in the `setup` method too. But > it is a breaking change that is NOT worth the benefit right now. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > public void setup(int numPartitions, TaskIOMetricGroup metrics) {} > int partition(K key); > } > {code} > That would be similar to `StreamPartitioner#setup()` method that we would > need to modify for passing the metrics group. > {code} > @Internal > public abstract class StreamPartitioner > implements ChannelSelector>>, > Serializable { > @Override > public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { > this.numberOfChannels = numberOfChannels; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner
[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-35384: --- Summary: Expose TaskIOMetricGroup to custom Partitioner (was: Expose metrics group to custom Partitioner) > Expose TaskIOMetricGroup to custom Partitioner > -- > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.4 >Reporter: Steven Zhen Wu >Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public DataStream partitionCustom( > Partitioner partitioner, KeySelector keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. > `Partitioner` interface is a functional interface today. we can add a new > default `setup` method without breaking the backward compatibility. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > *default void setup(TaskIOMetricGroup metrics) {}* > int partition(K key, int numPartitions); > } > {code} > I know public interface requires a FLIP process. will do that if the > community agree with this feature request. > Personally, `numPartitions` should be passed in the `setup` method too. But > it is a breaking change that is NOT worth the benefit right now. > {code} > @Public > @FunctionalInterface > public interface Partitioner extends java.io.Serializable, Function { > public void setup(int numPartitions, TaskIOMetricGroup metrics) {} > int partition(K key); > } > {code} > That would be similar to `StreamPartitioner#setup()` method that we would > need to modify for passing the metrics group. > {code} > @Internal > public abstract class StreamPartitioner > implements ChannelSelector>>, > Serializable { > @Override > public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { > this.numberOfChannels = numberOfChannels; > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35384) Expose metrics group to custom Partitioner
[ https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-35384: --- Description: I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`. We can implement the range partitioner using the public interface from `DataStream`. {code} public DataStream partitionCustom( Partitioner partitioner, KeySelector keySelector) {code} We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner. `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility. {code} @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); } {code} I know public interface requires a FLIP process. will do that if the community agree with this feature request. Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now. {code} @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); } {code} That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group. {code} @Internal public abstract class StreamPartitioner implements ChannelSelector>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; } {code} was: I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`. We can implement the range partitioner using the public interface from `DataStream`. ``` public DataStream partitionCustom( Partitioner partitioner, KeySelector keySelector) ``` We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner. `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility. ``` @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); } ``` I know public interface requires a FLIP process. will do that if the community agree with this feature request. Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now. ``` @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); } ``` That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group. ``` @Internal public abstract class StreamPartitioner implements ChannelSelector>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; } ``` > Expose metrics group to custom Partitioner > -- > > Key: FLINK-35384 > URL: https://issues.apache.org/jira/browse/FLINK-35384 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Affects Versions: 1.9.4 >Reporter: Steven Zhen Wu >Priority: Major > > I am trying to implement a custom range partitioner in the Flink Iceberg > sink. Want to publish some counter metrics for certain scenarios. This is > like the network metrics exposed in `TaskIOMetricGroup`. > We can implement the range partitioner using the public interface from > `DataStream`. > {code} > public DataStream partitionCustom( > Partitioner partitioner, KeySelector keySelector) > {code} > We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that > `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the > pubic `Partitioner` interface, where we can implement the custom range > partitioner. >
[jira] [Created] (FLINK-35384) Expose metrics group to custom Partitioner
Steven Zhen Wu created FLINK-35384: -- Summary: Expose metrics group to custom Partitioner Key: FLINK-35384 URL: https://issues.apache.org/jira/browse/FLINK-35384 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Affects Versions: 1.9.4 Reporter: Steven Zhen Wu I am trying to implement a custom range partitioner in the Flink Iceberg sink. Want to publish some counter metrics for certain scenarios. This is like the network metrics exposed in `TaskIOMetricGroup`. We can implement the range partitioner using the public interface from `DataStream`. ``` public DataStream partitionCustom( Partitioner partitioner, KeySelector keySelector) ``` We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the pubic `Partitioner` interface, where we can implement the custom range partitioner. `Partitioner` interface is a functional interface today. we can add a new default `setup` method without breaking the backward compatibility. ``` @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { *default void setup(TaskIOMetricGroup metrics) {}* int partition(K key, int numPartitions); } ``` I know public interface requires a FLIP process. will do that if the community agree with this feature request. Personally, `numPartitions` should be passed in the `setup` method too. But it is a breaking change that is NOT worth the benefit right now. ``` @Public @FunctionalInterface public interface Partitioner extends java.io.Serializable, Function { public void setup(int numPartitions, TaskIOMetricGroup metrics) {} int partition(K key); } ``` That would be similar to `StreamPartitioner#setup()` method that we would need to modify for passing the metrics group. ``` @Internal public abstract class StreamPartitioner implements ChannelSelector>>, Serializable { @Override public void setup(int numberOfChannels, TaskIOMetricGroup metrics) { this.numberOfChannels = numberOfChannels; } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642050#comment-17642050 ] Steven Zhen Wu commented on FLINK-29801: @ruanhang1993 I am not sure if `numEventsIn` is a very useful metrics. maybe hold back adding it until there is a clear value of it. If we do want to expose it, it will be better to add a tag of the event type/class name. > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface
[ https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17640306#comment-17640306 ] Steven Zhen Wu commented on FLINK-29801: This is an important missing part of coordinator. I am also very interested in this for FLIP-27 Iceberg source. [~MengYue] [~ruanhang1993] do you want to start a discussion thread of the FLIP-274? I have two questions. 1. There is already an empty `OperatorCoordinatorMetricGroup` interface in Flink. what is the reason to add this method to the interface? {code} /** * The total number of events received since the operator coordinator started. */ Counter getNumEventsInCounter(); {code} 2. How can the metric group be passed to `SourceCoordinatorContext`? > OperatorCoordinator need open the way to operate metricGroup interface > -- > > Key: FLINK-29801 > URL: https://issues.apache.org/jira/browse/FLINK-29801 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > Currently, We have no way to get metric group instances in OperatorCoordinator > In some cases, we may report some metric in OperatorCoordinator such as Flink > hudi integrate scene, some meta will send to operator coordinator to commit > to hdfs or hms > but we also need to report some metrics in operator coordinator for monitor > purpose -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 10:02 PM: --- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * We have to put the jar inside Flink `lib/` dir for the jar to be loaded. Then the same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:56 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} jar import works fine for 1.15.2. > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:55 PM: -- [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} * Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} * Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce with 1.16.0. note that jar import works fine for 1.15.2. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:54 PM: -- [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client with importing external jar {code} ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar {code} * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} jar import works fine for 1.15.2. was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu edited comment on FLINK-30035 at 11/27/22 8:53 PM: -- [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd {code} CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); {code} Then we shall see exception {code} [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] {code} Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. {code} cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded {code} was (Author: stevenz3wu): [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd ``` CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); ``` Then we shall see exception ``` [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] ``` Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. ``` cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded ``` > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17639718#comment-17639718 ] Steven Zhen Wu commented on FLINK-30035: [~fsk119] here are the steps to reproduce. * download the [iceberg-flink-runtime jar|https://repository.apache.org/content/repositories/orgapacheiceberg-1114/org/apache/iceberg/iceberg-flink-runtime-1.16/1.1.0/iceberg-flink-runtime-1.16-1.1.0.jar] without putting it into Flink`lib` dir * start sql-client: ` ./bin/sql-client.sh embedded --jar /path/to/iceberg-flink-runtime-1.16-1.1.0.jar` * run SQL cmd ``` CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='file:/Users/stevenwu/runtime/hdfs', 'property-version'='1' ); ``` Then we shall see exception ``` [ERROR] Could not execute SQL statement. Reason: java.lang.NoSuchMethodException: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog Missing org.apache.iceberg.hadoop.HadoopCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog] ``` Now if we put the jar inside Flink `lib/` dir. the external jar was loaded fine. The same SQL cmd will execute fine. ``` cp /path/to/iceberg-flink-runtime-1.16-1.1.0.jar lib/ ./bin/sql-client.sh embedded ``` > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-30035: --- Description: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). {code} ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar {code} With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. {code} java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog {code} I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the `flink/lib` directory to make the jar loaded. This seems like a regression of 1.16.0. was: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). {code} ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar {code} With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. {code} java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog {code} I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the iceberg-flink-runtime-1.16-1.1.0.jar file inside the > `flink/lib` directory to make the jar loaded. This seems like a regression of > 1.16.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
Steven Zhen Wu created FLINK-30035: -- Summary: ./bin/sql-client.sh won't import external jar into the session Key: FLINK-30035 URL: https://issues.apache.org/jira/browse/FLINK-30035 Project: Flink Issue Type: Bug Components: Table SQL / Client Affects Versions: 1.16.0 Reporter: Steven Zhen Wu I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). ``` ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar ``` With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. ``` java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog ``` I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-30035: --- Description: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). ``` ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar ``` With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. ``` java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog ``` I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. was: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). ``` ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar ``` With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. ``` java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog ``` I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > ``` > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > ``` > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > ``` > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > ``` > I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the > `flink/lib` directory to make the jar loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30035) ./bin/sql-client.sh won't import external jar into the session
[ https://issues.apache.org/jira/browse/FLINK-30035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-30035: --- Description: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). {code} ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar {code} With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. {code} java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog {code} I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. was: I used to be able to run the sql-client with iceberg-flink-runtime jar using the `-j,--jar ` option (e.g. with 1.15.2). ``` ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar ``` With 1.16.0, this doesn't work anymore. As a result, I am seeing ClassNotFoundException. ``` java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog ``` I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the `flink/lib` directory to make the jar loaded. > ./bin/sql-client.sh won't import external jar into the session > -- > > Key: FLINK-30035 > URL: https://issues.apache.org/jira/browse/FLINK-30035 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.16.0 >Reporter: Steven Zhen Wu >Priority: Major > > I used to be able to run the sql-client with iceberg-flink-runtime jar using > the `-j,--jar ` option (e.g. with 1.15.2). > {code} > ./bin/sql-client.sh embedded --jar iceberg-flink-runtime-1.16-1.1.0.jar > {code} > With 1.16.0, this doesn't work anymore. As a result, I am seeing > ClassNotFoundException. > {code} > java.lang.ClassNotFoundException: org.apache.iceberg.hadoop.HadoopCatalog > {code} > I have to put the `iceberg-flink-runtime-1.16-1.1.0.jar` file inside the > `flink/lib` directory to make the jar loaded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27405) Refactor SourceCoordinator to abstract BaseCoordinator implementation
[ https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17530078#comment-17530078 ] Steven Zhen Wu commented on FLINK-27405: [~arvid] thanks a lot for the feedbacks. We will create a FLIP to the new public abstract class `CoordinatorBase`. Yes we can drive the effort. For now, we don't plan to put any checkpoint related logics into the `CoordinatorBase`. We will just move the minimally required APIs (that are also reasonable) to the base class in the first iteration. > Refactor SourceCoordinator to abstract BaseCoordinator implementation > - > > Key: FLINK-27405 > URL: https://issues.apache.org/jira/browse/FLINK-27405 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: gang ye >Priority: Major > > To solve small files issue caused by data skewness, Flink Iceberg data > shuffling was proposed(design doc > [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). > The basic idea is to use statistics operator to collect local statistics for > traffic distribution at taskmanagers (workers). Local statistics are > periodically sent to the statistics coordinator (running in jobmanager). Once > globally aggregated statistics are ready, the statistics coordinator > broadcasts them to all operator instances. And then a customized partitioner > uses the global statistics which is passed down from statistics operator to > distribute data to Iceberg writers. > In the process of Flink Iceberg data shuffling implementation, we found that, > StatisticsCoordinator can share function with > SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar > function as SourceCoordinatorConext#callInCoordinatorThread and the > StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as > SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want > to refactor the source coordinator classes to abstract a general coordinator > implementation to reduce the duplicated code when adding other coordinators. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Comment Edited] (FLINK-27405) Refactor SourceCoordinator to abstract BaseCoordinator implementation
[ https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17528443#comment-17528443 ] Steven Zhen Wu edited comment on FLINK-27405 at 4/26/22 10:42 PM: -- cc [~arvid] [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later. was (Author: stevenz3wu): cc [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later. > Refactor SourceCoordinator to abstract BaseCoordinator implementation > - > > Key: FLINK-27405 > URL: https://issues.apache.org/jira/browse/FLINK-27405 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: gang ye >Priority: Major > > To solve small files issue caused by data skewness, Flink Iceberg data > shuffling was proposed(design doc > [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). > The basic idea is to use statistics operator to collect local statistics for > traffic distribution at taskmanagers (workers). Local statistics are > periodically sent to the statistics coordinator (running in jobmanager). Once > globally aggregated statistics are ready, the statistics coordinator > broadcasts them to all operator instances. And then a customized partitioner > uses the global statistics which is passed down from statistics operator to > distribute data to Iceberg writers. > In the process of Flink Iceberg data shuffling implementation, we found that, > StatisticsCoordinator can share function with > SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar > function as SourceCoordinatorConext#callInCoordinatorThread and the > StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as > SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want > to refactor the source coordinator classes to abstract a general coordinator > implementation to reduce the duplicated code when adding other coordinators. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27405) Refactor SourceCoordinator to abstract BaseCoordinator implementation
[ https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17528443#comment-17528443 ] Steven Zhen Wu commented on FLINK-27405: cc [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later. > Refactor SourceCoordinator to abstract BaseCoordinator implementation > - > > Key: FLINK-27405 > URL: https://issues.apache.org/jira/browse/FLINK-27405 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: gang ye >Priority: Major > > To solve small files issue caused by data skewness, Flink Iceberg data > shuffling was proposed(design doc > [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). > The basic idea is to use statistics operator to collect local statistics for > traffic distribution at taskmanagers (workers). Local statistics are > periodically sent to the statistics coordinator (running in jobmanager). Once > globally aggregated statistics are ready, the statistics coordinator > broadcasts them to all operator instances. And then a customized partitioner > uses the global statistics which is passed down from statistics operator to > distribute data to Iceberg writers. > In the process of Flink Iceberg data shuffling implementation, we found that, > StatisticsCoordinator can share function with > SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar > function as SourceCoordinatorConext#callInCoordinatorThread and the > StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as > SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want > to refactor the source coordinator classes to abstract a general coordinator > implementation to reduce the duplicated code when adding other coordinators. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-27255) Flink-avro does not support serialization and deserialization of avro schema longer than 65535 characters
[ https://issues.apache.org/jira/browse/FLINK-27255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522943#comment-17522943 ] Steven Zhen Wu commented on FLINK-27255: [~jinyius] this issue existed for a while now. not sth new. > Flink-avro does not support serialization and deserialization of avro schema > longer than 65535 characters > - > > Key: FLINK-27255 > URL: https://issues.apache.org/jira/browse/FLINK-27255 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.14.4 >Reporter: Haizhou Zhao >Assignee: Haizhou Zhao >Priority: Major > > The underlying serialization of avro schema uses string serialization method > of ObjectOutputStream.class, however, the default string serialization by > ObjectOutputStream.class does not support handling string of more than 66535 > characters (64kb). As a result, constructing flink operators that > input/output Avro Generic Record with huge schema is not possible. > > The purposed fix is two change the serialization and deserialization method > of these following classes so that huge string could also be handled. > > [GenericRecordAvroTypeInfo|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java#L107] > [SerializableAvroSchema|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L55] > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522586#comment-17522586 ] Steven Zhen Wu commented on FLINK-27101: [~pnowojski] Option 3 (externally triggered checkpoint) should work well. Thanks! > Periodically break the chain of incremental checkpoint > -- > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Steven Zhen Wu >Priority: Major > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to the > old files. Is there any RocksDB compation can achieve full compaction of > removing old delete markers. Recent delete markers are fine > [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17520811#comment-17520811 ] Steven Zhen Wu commented on FLINK-27101: [~yunta] a pluggable checkpoint trigger policy might work well here. If I understand you correctly, you are saying we can switch the triggered checkpiont from incremental to full in a customized policy based on some cron conditions. triggering a savpoint is not a concern. it can be scheduled by the control plane. If we want to use the savepoint to break the incremental chain, we would need to redeploy the job from the savepoint. the redeployment could feel like hacky. > Periodically break the chain of incremental checkpoint > -- > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Steven Zhen Wu >Priority: Major > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to the > old files. Is there any RocksDB compation can achieve full compaction of > removing old delete markers. Recent delete markers are fine > [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17519060#comment-17519060 ] Steven Zhen Wu commented on FLINK-27101: [~pnowojski] [~yunta] would love to hear your thoughts > Periodically break the chain of incremental checkpoint > -- > > Key: FLINK-27101 > URL: https://issues.apache.org/jira/browse/FLINK-27101 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing >Reporter: Steven Zhen Wu >Priority: Major > > Incremental checkpoint is almost a must for large-state jobs. It greatly > reduces the bytes uploaded to DFS per checkpoint. However, there are a few > implications from incremental checkpoint that are problematic for production > operations. Will use S3 as an example DFS for the rest of description. > 1. Because there is no way to deterministically know how far back the > incremental checkpoint can refer to files uploaded to S3, it is very > difficult to set S3 bucket/object TTL. In one application, we have observed > Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can > corrupt the Flink checkpoints. > S3 TTL is important for a few reasons > - purge orphaned files (like external checkpoints from previous deployments) > to keep the storage cost in check. This problem can be addressed by > implementing proper garbage collection (similar to JVM) by traversing the > retained checkpoints from all jobs and traverse the file references. But that > is an expensive solution from engineering cost perspective. > - Security and privacy. E.g., there may be requirement that Flink state can't > keep the data for more than some duration threshold (hours/days/weeks). > Application is expected to purge keys to satisfy the requirement. However, > with incremental checkpoint and how deletion works in RocksDB, it is hard to > set S3 TTL to purge S3 files. Even though those old S3 files don't contain > live keys, they may still be referrenced by retained Flink checkpoints. > 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a > result, restoring from checkpoint failed. With incremental checkpoint, it > usually doesn't help to try other older checkpoints, because they may refer > to the same corrupted file. It is unclear whether the corruption happened > before or during S3 upload. This risk can be mitigated with periodical > savepoints. > It all boils down to periodical full snapshot (checkpoint or savepoint) to > deterministically break the chain of incremental checkpoints. Search the jira > history, the behavior that FLINK-23949 [1] trying to fix is actually close to > what we would need here. > There are a few options > 1. Periodically trigger savepoints (via control plane). This is actually not > a bad practice and might be appealing to some people. The problem is that it > requires a job deployment to break the chain of incremental checkpoint. > periodical job deployment may sound hacky. If we make the behavior of full > checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be > an acceptable compromise. The benefit is that no job deployment is required > after savepoints. > 2. Build the feature in Flink incremental checkpoint. Periodically (with some > cron style config) trigger a full checkpoint to break the incremental chain. > If the full checkpoint failed (due to whatever reason), the following > checkpoints should attempt full checkpoint as well until one successful full > checkpoint is completed. > 3. For the security/privacy requirement, the main thing is to apply > compaction on the deleted keys. That could probably avoid references to the > old files. Is there any RocksDB compation can achieve full compaction of > removing old delete markers. Recent delete markers are fine > [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-27101) Periodically break the chain of incremental checkpoint
[ https://issues.apache.org/jira/browse/FLINK-27101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-27101: --- Description: Incremental checkpoint is almost a must for large-state jobs. It greatly reduces the bytes uploaded to DFS per checkpoint. However, there are a few implications from incremental checkpoint that are problematic for production operations. Will use S3 as an example DFS for the rest of description. 1. Because there is no way to deterministically know how far back the incremental checkpoint can refer to files uploaded to S3, it is very difficult to set S3 bucket/object TTL. In one application, we have observed Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can corrupt the Flink checkpoints. S3 TTL is important for a few reasons - purge orphaned files (like external checkpoints from previous deployments) to keep the storage cost in check. This problem can be addressed by implementing proper garbage collection (similar to JVM) by traversing the retained checkpoints from all jobs and traverse the file references. But that is an expensive solution from engineering cost perspective. - Security and privacy. E.g., there may be requirement that Flink state can't keep the data for more than some duration threshold (hours/days/weeks). Application is expected to purge keys to satisfy the requirement. However, with incremental checkpoint and how deletion works in RocksDB, it is hard to set S3 TTL to purge S3 files. Even though those old S3 files don't contain live keys, they may still be referrenced by retained Flink checkpoints. 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a result, restoring from checkpoint failed. With incremental checkpoint, it usually doesn't help to try other older checkpoints, because they may refer to the same corrupted file. It is unclear whether the corruption happened before or during S3 upload. This risk can be mitigated with periodical savepoints. It all boils down to periodical full snapshot (checkpoint or savepoint) to deterministically break the chain of incremental checkpoints. Search the jira history, the behavior that FLINK-23949 [1] trying to fix is actually close to what we would need here. There are a few options 1. Periodically trigger savepoints (via control plane). This is actually not a bad practice and might be appealing to some people. The problem is that it requires a job deployment to break the chain of incremental checkpoint. periodical job deployment may sound hacky. If we make the behavior of full checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be an acceptable compromise. The benefit is that no job deployment is required after savepoints. 2. Build the feature in Flink incremental checkpoint. Periodically (with some cron style config) trigger a full checkpoint to break the incremental chain. If the full checkpoint failed (due to whatever reason), the following checkpoints should attempt full checkpoint as well until one successful full checkpoint is completed. 3. For the security/privacy requirement, the main thing is to apply compaction on the deleted keys. That could probably avoid references to the old files. Is there any RocksDB compation can achieve full compaction of removing old delete markers. Recent delete markers are fine [1] https://issues.apache.org/jira/browse/FLINK-23949 was: Incremental checkpoint is almost a must for large-state jobs. It greatly reduces the bytes uploaded to DFS per checkpoint. However, there are a few implications from incremental checkpoint that are problematic for production operations. Will use S3 as an example DFS in the rest of description. 1. Because there is no way to deterministically know how far back the incremental checkpoint can refer to files uploaded to S3, it is very difficult to set S3 bucket/object TTL. In one application, we have observed Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can corrupt the Flink checkpoints. S3 TTL is important for a few reasons - purge orphaned files (like external checkpoints from previous deployments) to keep the storage cost in check. This problem can be addressed by implementing proper garbage collection (similar to JVM) by traversing the retained checkpoints from all jobs and traverse the file references. But that is an expensive solution from engineering cost perspective. - Security and privacy. E.g., there may be requirement that Flink state can't keep the data for more than some duration threshold (hours/days/weeks). Application is expected to purge keys to satisfy the requirement. However, with incremental checkpoint and how deletion works in RocksDB, it is hard to set S3 TTL to purge S3 files. Even though those old S3 files don't contain live keys, they may still be referrenced by retained Flink
[jira] [Created] (FLINK-27101) Periodically break the chain of incremental checkpoint
Steven Zhen Wu created FLINK-27101: -- Summary: Periodically break the chain of incremental checkpoint Key: FLINK-27101 URL: https://issues.apache.org/jira/browse/FLINK-27101 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing Reporter: Steven Zhen Wu Incremental checkpoint is almost a must for large-state jobs. It greatly reduces the bytes uploaded to DFS per checkpoint. However, there are a few implications from incremental checkpoint that are problematic for production operations. Will use S3 as an example DFS in the rest of description. 1. Because there is no way to deterministically know how far back the incremental checkpoint can refer to files uploaded to S3, it is very difficult to set S3 bucket/object TTL. In one application, we have observed Flink checkpoint referring to files uploaded over 6 months ago. S3 TTL can corrupt the Flink checkpoints. S3 TTL is important for a few reasons - purge orphaned files (like external checkpoints from previous deployments) to keep the storage cost in check. This problem can be addressed by implementing proper garbage collection (similar to JVM) by traversing the retained checkpoints from all jobs and traverse the file references. But that is an expensive solution from engineering cost perspective. - Security and privacy. E.g., there may be requirement that Flink state can't keep the data for more than some duration threshold (hours/days/weeks). Application is expected to purge keys to satisfy the requirement. However, with incremental checkpoint and how deletion works in RocksDB, it is hard to set S3 TTL to purge S3 files. Even though those old S3 files don't contain live keys, they may still be referrenced by retained Flink checkpoints. 2. Occasionally, corrupted checkpoint files (on S3) are observed. As a result, restoring from checkpoint failed. With incremental checkpoint, it usually doesn't help to try other older checkpoints, because they may refer to the same corrupted file. It is unclear whether the corruption happened before or during S3 upload. This risk can be mitigated with periodical savepoints. It all boils down to periodical full snapshot (checkpoint or savepoint) to deterministically break the chain of incremental checkpoints. Search the jira history, the behavior that FLINK-23949 [1] trying to fix is actually close to what we would need here. There are a few options 1. Periodically trigger savepoints (via control plane). This is actually not a bad practice and might be appealing to some people. The problem is that it requires a job deployment to break the chain of incremental checkpoint. periodical job deployment may sound hacky. If we make the behavior of full checkpoint after a savepoint (fixed in FLINK-23949) configurable, it might be an acceptable compromise. The benefit is that no job deployment is required after savepoints. 2. Build the feature in Flink incremental checkpoint. Periodically (with some cron style config) trigger a full checkpoint to break the incremental chain. If the full checkpoint failed (due to whatever reason), the following checkpoints should attempt full checkpoint as well until one successful full checkpoint is completed. 3. For the security/privacy requirement, the main thing is to apply compaction on the deleted keys. That could probably avoid references to the old files. Is there any RocksDB compation can achieve full compaction of removing old delete markers. Recent delete markers are fine [1] https://issues.apache.org/jira/browse/FLINK-23949 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501576#comment-17501576 ] Steven Zhen Wu edited comment on FLINK-21364 at 3/4/22, 10:56 PM: -- actually, I think we should look into FLIP-182 FLINK-18450 first and see if Iceberg source can use the same mechanism to watermark alignment. cc [~sundaram] was (Author: stevenz3wu): actually, I think we should look into FLIP-182 FLINK-18450 first and see if Iceberg source can use the same mechanism to watermark alignment. > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501576#comment-17501576 ] Steven Zhen Wu commented on FLINK-21364: actually, I think we should look into FLIP-182 FLINK-18450 first and see if Iceberg source can use the same mechanism to watermark alignment. > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501528#comment-17501528 ] Steven Zhen Wu commented on FLINK-21364: [~jqin] [~thw] [~pnowojski] I like to restart this discussion. Besides the proposal in this PR, we can also go with the other approach that Thomas mentioned where we create a separate `FinishedSplitsEvent`. Please let me know your thoughts. > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501525#comment-17501525 ] Steven Zhen Wu commented on FLINK-21364: [~pnowojski] sorry for missed you message earlier. Yeah, the motivation is for watermark alignment for Iceberg source, where the watermark alignment is happening in the enumerator. Hence the enumerator needs to know which files/splits are completed and decides whether to advance watermark or not. https://github.com/stevenzwu/iceberg/blob/flip27IcebergSource/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java#L89 For sources with unbounded split (like Kafka), I believe the watermark alignment is done at readers side. > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-23843) Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should cause Global Failure instead of Process Kill
[ https://issues.apache.org/jira/browse/FLINK-23843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17473207#comment-17473207 ] Steven Zhen Wu commented on FLINK-23843: I am not sure what's the right way to fix it. But Stephan's original description makes sense to me. We don't want exception to cause kill of jobmanager process. Rather a full job restart is expected in this case. > Exceptions during "SplitEnumeratorContext.runInCoordinatorThread()" should > cause Global Failure instead of Process Kill > --- > > Key: FLINK-23843 > URL: https://issues.apache.org/jira/browse/FLINK-23843 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.13.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Critical > Fix For: 1.15.0 > > > Currently, when a the method > "SplitEnumeratorContext.runInCoordinatorThread()" throws an exception, the > effect is a process kill of the JobManager process. > The chain how the process kill happens is: > * An exception bubbling up in the executor, killing the executor thread > * The executor starts a replacement thread, which is forbidden by the thread > factory (as a safety net) and causes a process kill. > We should prevent such exceptions from bubbling up in the coordinator > executor. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400 ] Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM: -- Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem ``` org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:781) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more ``` was (Author: stevenz3wu): Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3, 1.13.2 >Reporter: JING ZHANG >
[jira] [Comment Edited] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400 ] Steven Zhen Wu edited comment on FLINK-23886 at 9/22/21, 3:58 PM: -- Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem {code} org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:118) at org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:82) at org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:71) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:93) at org.apache.flink.contrib.streaming.state.RocksDBPriorityQueueSetFactory.create(RocksDBPriorityQueueSetFactory.java:113) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:476) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.createTimerPriorityQueue(InternalTimeServiceManagerImpl.java:174) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.registerOrGetTimerService(InternalTimeServiceManagerImpl.java:160) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.getInternalTimerService(InternalTimeServiceManagerImpl.java:136) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:620) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:225) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:329) at org.apache.flink.types.StringValue.readString(StringValue.java:781) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:73) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:31) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:161) at org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:43) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:386) ... 25 more {code} was (Author: stevenz3wu): Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem ``` org.apache.flink.util.FlinkRuntimeException: Error while deserializing the element. at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:388) at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:145) at
[jira] [Commented] (FLINK-23886) An exception is thrown out when recover job timers from checkpoint file
[ https://issues.apache.org/jira/browse/FLINK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418400#comment-17418400 ] Steven Zhen Wu commented on FLINK-23886: Just to add another data point. We observed the same issue with Flink 1.13.2 in production. We don't know how to reproduce this type of tricky state corruption problem > An exception is thrown out when recover job timers from checkpoint file > --- > > Key: FLINK-23886 > URL: https://issues.apache.org/jira/browse/FLINK-23886 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.0, 1.11.3 >Reporter: JING ZHANG >Priority: Major > Attachments: image-2021-08-25-16-38-04-023.png, > image-2021-08-25-16-38-12-308.png, image-2021-08-25-17-06-29-806.png, > image-2021-08-25-17-07-38-327.png > > > A user report the bug in the [mailist. > |http://mail-archives.apache.org/mod_mbox/flink-user/202108.mbox/%3ccakmsf43j14nkjmgjuy4dh5qn2vbjtw4tfh4pmmuyvcvfhgf...@mail.gmail.com%3E]I > paste the content here. > Setup Specifics: > Version: 1.6.2 > RocksDB Map State > Timers stored in rocksdb > > When we have this job running for long periods of time like > 30 days, if > for some reason the job restarts, we encounter "Error while deserializing the > element". Is this a known issue fixed in later versions? I see some changes > to code for FLINK-10175, but we don't use any queryable state > > Below is the stack trace > > org.apache.flink.util.FlinkRuntimeException: Error while deserializing the > element. > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.deserializeElement(RocksDBCachingPriorityQueueSet.java:389) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:56) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:274) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:261) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:164) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftUp(HeapPriorityQueue.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueue.addInternal(HeapPriorityQueue.java:85) > at > org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueue.add(AbstractHeapPriorityQueue.java:73) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.(KeyGroupPartitionedPriorityQueue.java:89) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBPriorityQueueSetFactory.create(RocksDBKeyedStateBackend.java:2792) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.create(RocksDBKeyedStateBackend.java:450) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.createTimerPriorityQueue(InternalTimeServiceManager.java:121) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.registerOrGetTimerService(InternalTimeServiceManager.java:106) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.getInternalTimerService(InternalTimeServiceManager.java:87) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getInternalTimerService(AbstractStreamOperator.java:764) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:61) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:769) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:179) > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:46) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:168) > at >
[jira] [Closed] (FLINK-10360) support timeout in savepoint REST api
[ https://issues.apache.org/jira/browse/FLINK-10360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu closed FLINK-10360. -- Resolution: Duplicate > support timeout in savepoint REST api > - > > Key: FLINK-10360 > URL: https://issues.apache.org/jira/browse/FLINK-10360 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / REST, Runtime / State > Backends >Reporter: Steven Zhen Wu >Priority: Minor > Labels: auto-deprioritized-major > > right now, savepoint share the same timeout config as checkpoint. With > incremental checkpoint, we just need to configure the timeout to be > reasonable for the small delta. With savepoint, it is a full snapshot and > maybe the checkpoint timeout is too small for savepoint. > * incremental checkpoint size ranges from 100 GB - 400 GB, and took 1 mins - > 4 mins > * full savepoint is ~4.5 TB and took ~25 mins > It will bring more flexibility if the savepoint REST api supports an > additional timeout param, and apply it for actual savepoint timeout. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21513) Rethink up-/down-/restartingTime metrics
[ https://issues.apache.org/jira/browse/FLINK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329947#comment-17329947 ] Steven Zhen Wu edited comment on FLINK-21513 at 4/27/21, 3:41 AM: -- [~trohrmann] thanks for tagging me here. Yeah, availability can be a little tricky concept for Flink application. Users typically ask what is the availability / uptime for my Flink application (e.g. 4 nines). For micro services, availability can be measured as success/failure rate in a naive way (there are more sophisticated and probably accurate ways). How do we define availability for Flink? Current uptime metric doesn't capture availability. Also availability probably can be captured in different time scales (last hour, last 4 hours, last 12 hours, last 24 hours, last week etc.). was (Author: stevenz3wu): [~trohrmann] thanks for tagging me here. Yeah, availability can be a little tricky concept for Flink application. Users typically ask what is the availability / uptime for my Flink application (4 nines). E.g., for micro services, availability can be measured as success/failure rate in a naive way (there are more sophisticated and probably accurate ways). How do we define availability for Flink? uptime doesn't capture availability. Also availability probably can be captured in different time scales (last hour, last 4 hours, last 12 hours, last 24 hours, last week etc.). > Rethink up-/down-/restartingTime metrics > > > Key: FLINK-21513 > URL: https://issues.apache.org/jira/browse/FLINK-21513 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Metrics >Reporter: Chesnay Schepler >Priority: Major > Labels: stale-major > Fix For: 1.13.0 > > > While thinking about FLINK-21510 I stumbled upon some issues in the the > semantics of these metrics, both from a user perspective and from our own, > and I think we need to clarify some things. > h4. upTime > This metric describes the time since the job transitioned RUNNING state. > It is meant as a measure for how stably a deployment is. > In the default scheduler this transitions happens before we do any actual > scheduling work, and as a result this also includes the time it takes for the > JM to request slots and deploy tasks. In practive this means we start the > timer once the job has been submitted and the JobMaster/Scheduler/EG have > been initialized. > For the adaptive scheduler this now puts us a bit into an odd situation > because it first acquires slots before actually transitioning the EG into a > RUNNING state, so as is we'd end up measuring 2 slightly different things. > The question now is whether this is a problem. > While we could certainly stick with the definition of "time since EG switched > to RUNNING", it raises the question what the semantics of this metric are > should a scheduler use a different data-structure than the EG. > In other words, what I'm looking for is a definition that is independent from > existing data-structures; a crude example could be "The time since the job is > in a state where the deployment of a task is possible.". > An alternative for the adaptive scheduler would be to measure the time since > we transitioned to WaitingForResources, with which we would also include the > slot acquisition, but it would be inconsistent with the logs and UI (because > they only display an INITIALIZING job). > h4. restartingTime > This metric describes the time since the job transitioned into a RESTARTING > state. > It is meant as a measure for how long the recovery in case of a job failure > takes. > In the default scheduler this in practice is the time between a failure > arriving at the JM and the cancellation of tasks being completed / restart > backoff (whichever is higher). > This is consistent with the semantics of the upTime metric, because upTime > also includes the time required for acquiring slots and deploying tasks. > For the adaptive scheduler we can follow similar semantics, by measuring the > time we spend in the {{Restarting}} state. > However, if we stick to the definition of upTime as time spent in RUNNING, > then we will end up with a gap for the time spent in WaitingForResources. > h4. downTime > This metric describes the time between the job transitioning from FAILING to > RUNNING. > It is meant as a measure for how long the recovery in case of a job failure > takes. > You may be wondering what the difference between {{downTime}} and > {{restartingTime}} is meant to be. Unfortunately I do not have the answer to > that. > Presumably, at the time they were added, they were covering different parts > of the recovery process, but since we never documented these steps explicitly > the exact semantics are
[jira] [Commented] (FLINK-21513) Rethink up-/down-/restartingTime metrics
[ https://issues.apache.org/jira/browse/FLINK-21513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329947#comment-17329947 ] Steven Zhen Wu commented on FLINK-21513: [~trohrmann] thanks for tagging me here. Yeah, availability can be a little tricky concept for Flink application. Users typically ask what is the availability / uptime for my Flink application (4 nines). E.g., for micro services, availability can be measured as success/failure rate in a naive way (there are more sophisticated and probably accurate ways). How do we define availability for Flink? uptime doesn't capture availability. Also availability probably can be captured in different time scales (last hour, last 4 hours, last 12 hours, last 24 hours, last week etc.). > Rethink up-/down-/restartingTime metrics > > > Key: FLINK-21513 > URL: https://issues.apache.org/jira/browse/FLINK-21513 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / Metrics >Reporter: Chesnay Schepler >Priority: Major > Labels: stale-major > Fix For: 1.13.0 > > > While thinking about FLINK-21510 I stumbled upon some issues in the the > semantics of these metrics, both from a user perspective and from our own, > and I think we need to clarify some things. > h4. upTime > This metric describes the time since the job transitioned RUNNING state. > It is meant as a measure for how stably a deployment is. > In the default scheduler this transitions happens before we do any actual > scheduling work, and as a result this also includes the time it takes for the > JM to request slots and deploy tasks. In practive this means we start the > timer once the job has been submitted and the JobMaster/Scheduler/EG have > been initialized. > For the adaptive scheduler this now puts us a bit into an odd situation > because it first acquires slots before actually transitioning the EG into a > RUNNING state, so as is we'd end up measuring 2 slightly different things. > The question now is whether this is a problem. > While we could certainly stick with the definition of "time since EG switched > to RUNNING", it raises the question what the semantics of this metric are > should a scheduler use a different data-structure than the EG. > In other words, what I'm looking for is a definition that is independent from > existing data-structures; a crude example could be "The time since the job is > in a state where the deployment of a task is possible.". > An alternative for the adaptive scheduler would be to measure the time since > we transitioned to WaitingForResources, with which we would also include the > slot acquisition, but it would be inconsistent with the logs and UI (because > they only display an INITIALIZING job). > h4. restartingTime > This metric describes the time since the job transitioned into a RESTARTING > state. > It is meant as a measure for how long the recovery in case of a job failure > takes. > In the default scheduler this in practice is the time between a failure > arriving at the JM and the cancellation of tasks being completed / restart > backoff (whichever is higher). > This is consistent with the semantics of the upTime metric, because upTime > also includes the time required for acquiring slots and deploying tasks. > For the adaptive scheduler we can follow similar semantics, by measuring the > time we spend in the {{Restarting}} state. > However, if we stick to the definition of upTime as time spent in RUNNING, > then we will end up with a gap for the time spent in WaitingForResources. > h4. downTime > This metric describes the time between the job transitioning from FAILING to > RUNNING. > It is meant as a measure for how long the recovery in case of a job failure > takes. > You may be wondering what the difference between {{downTime}} and > {{restartingTime}} is meant to be. Unfortunately I do not have the answer to > that. > Presumably, at the time they were added, they were covering different parts > of the recovery process, but since we never documented these steps explicitly > the exact semantics are no longer clear and there are no specs that a > scheduler can follow. > Furthermore, this metric is currently broken because a FAILING job _never_ > transitions into RUNNING anymore. > The default scheduler transitions from RUNNING -> RESTARTING -> RUNNING, > whereas the adaptive scheduler cancels the job and creates a new EG. > As it is we could probably merge downTime and restartingTime because they > seem to cover the exact same thing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284024#comment-17284024 ] Steven Zhen Wu edited comment on FLINK-21364 at 2/12/21, 10:57 PM: --- For pull based source (like file/Iceberg), it is probably more natural/efficient to piggyback the `finishedSplitsIds` in the `RequestSplitEvent`. A reader request a new split when the current split is done. It doesn't mean that a reader has to request for a new split when finishing some splits, like bounded Kafka source case. You have a good point that some sources (like Kafka/Kineses) may still need to communicate the watermark info to coordinator/enumerator. In this case, it definitely will be a separate type of event (like `WatermarkUpdateEvent`). In our Iceberg source use cases, readers didn't actually report watermark. They just need to report which split are finished. All the ordering/watermark tracking is centralized in Iceberg source coordinator. But I can see that this may not be a very generic scenario to change the `RequestSplitEvent` in flink-runtime. cc [~sundaram] was (Author: stevenz3wu): For pull based source (like file/Iceberg), it is probably more natural/efficient to piggyback the `finishedSplitsIds` in the `RequestSplitEvent`. A reader request a new split when the current split is done. It doesn't mean that a reader has to request for a new split when finishing some splits, like bounded Kafka source case. You have a good point that some sources (like Kafka/Kineses) may still need to communicate the watermark info to coordinator/enumerator. In this case, it definitely will be a separate type of event (like `WatermarkUpdateEvent`). In our Iceberg source use cases, readers didn't actually report watermark. They just need to report which split are finished. But I can see that this may not be a very generic scenario to change the `RequestSplitEvent` in flink-runtime. cc [~sundaram] > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17284024#comment-17284024 ] Steven Zhen Wu commented on FLINK-21364: For pull based source (like file/Iceberg), it is probably more natural/efficient to piggyback the `finishedSplitsIds` in the `RequestSplitEvent`. A reader request a new split when the current split is done. It doesn't mean that a reader has to request for a new split when finishing some splits, like bounded Kafka source case. You have a good point that some sources (like Kafka/Kineses) may still need to communicate the watermark info to coordinator/enumerator. In this case, it definitely will be a separate type of event (like `WatermarkUpdateEvent`). In our Iceberg source use cases, readers didn't actually report watermark. They just need to report which split are finished. But I can see that this may not be a very generic scenario to change the `RequestSplitEvent` in flink-runtime. cc [~sundaram] > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-21364: --- Description: For some split assignment strategy, the enumerator/assigner needs to track the completed splits to advance watermark for event time alignment or rough ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support pass-along of the `finishedSplitIds` info and hence we have to create our own custom source event type for Iceberg source. Here is the proposal of add such optional info to `RequestSplitEvent`. {code} public RequestSplitEvent( @Nullable String hostName, @Nullable Collection finishedSplitIds) {code} was: For some split assignment strategy, the enumerator/assigner needs to track the completed splits to advance watermark for event time alignment or rough ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support pass-along of the `finishedSplitIds` info and hence we have to create our own custom source event type for Iceberg source. Here is the proposal of add such optional info to `RequestSplitEvent`. ``` public RequestSplitEvent( @Nullable String hostName, @Nullable Collection finishedSplitIds) ``` > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Major > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > {code} > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
[ https://issues.apache.org/jira/browse/FLINK-21364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283152#comment-17283152 ] Steven Zhen Wu commented on FLINK-21364: cc [~sewen] [~jqin] [~thomasWeise] > piggyback finishedSplitIds in RequestSplitEvent > --- > > Key: FLINK-21364 > URL: https://issues.apache.org/jira/browse/FLINK-21364 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.12.1 >Reporter: Steven Zhen Wu >Priority: Major > > For some split assignment strategy, the enumerator/assigner needs to track > the completed splits to advance watermark for event time alignment or rough > ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support > pass-along of the `finishedSplitIds` info and hence we have to create our own > custom source event type for Iceberg source. > Here is the proposal of add such optional info to `RequestSplitEvent`. > ``` > public RequestSplitEvent( > @Nullable String hostName, > @Nullable Collection finishedSplitIds) > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21364) piggyback finishedSplitIds in RequestSplitEvent
Steven Zhen Wu created FLINK-21364: -- Summary: piggyback finishedSplitIds in RequestSplitEvent Key: FLINK-21364 URL: https://issues.apache.org/jira/browse/FLINK-21364 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.12.1 Reporter: Steven Zhen Wu For some split assignment strategy, the enumerator/assigner needs to track the completed splits to advance watermark for event time alignment or rough ordering. Right now, `RequestSplitEvent` for FLIP-27 source doesn't support pass-along of the `finishedSplitIds` info and hence we have to create our own custom source event type for Iceberg source. Here is the proposal of add such optional info to `RequestSplitEvent`. ``` public RequestSplitEvent( @Nullable String hostName, @Nullable Collection finishedSplitIds) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267590#comment-17267590 ] Steven Zhen Wu edited comment on FLINK-20174 at 1/19/21, 6:59 PM: -- [~sewen] [~lzljs3620320] Can you take a look if this is the right direction for fixing this problem? [https://github.com/stevenzwu/flink/commit/f5bb05ef74a1ae10e20b13c8796731378da076c1] We can also consider move `BulkFormat` to `flink-connector-base`? was (Author: stevenz3wu): [~sewen] [~lzljs3620320] Can you take a look if this is the right direction for fixing this problem? [https://github.com/stevenzwu/flink/commit/f5bb05ef74a1ae10e20b13c8796731378da076c1] > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267590#comment-17267590 ] Steven Zhen Wu commented on FLINK-20174: [~sewen] [~lzljs3620320] Can you take a look if this is the right direction for fixing this problem? [https://github.com/stevenzwu/flink/commit/f5bb05ef74a1ae10e20b13c8796731378da076c1] > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21000) set MetricGroup in SourceCoordinatorContext
[ https://issues.apache.org/jira/browse/FLINK-21000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266679#comment-17266679 ] Steven Zhen Wu commented on FLINK-21000: I tried to trace the code up the chain: [https://github.com/stevenzwu/flink/commit/fb29be664b24c7b1ac13ca98393e77b0a8213888] But I got stuck in the `StreamExecutionEnvironment#getStreamGraphGenerator()`. I am not sure how can `StreamExecutionEnvironment` get the `MetricGroup` or `MetricRegistry` > set MetricGroup in SourceCoordinatorContext > --- > > Key: FLINK-21000 > URL: https://issues.apache.org/jira/browse/FLINK-21000 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, SourceCoordinatorContext returns null. We need a valid metric > group to publish source metrics (e.g. number of pending splits) in source > enumerator. > cc [~sewen] [~jqin] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21000) set MetricGroup in SourceCoordinatorContext
Steven Zhen Wu created FLINK-21000: -- Summary: set MetricGroup in SourceCoordinatorContext Key: FLINK-21000 URL: https://issues.apache.org/jira/browse/FLINK-21000 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.12.0 Reporter: Steven Zhen Wu Right now, SourceCoordinatorContext returns null. We need a valid metric group to publish source metrics (e.g. number of pending splits) in source enumerator. cc [~sewen] [~jqin] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-20871) Make DataStream#executeAndCollectWithClient() public
[ https://issues.apache.org/jira/browse/FLINK-20871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu closed FLINK-20871. -- Resolution: Invalid > Make DataStream#executeAndCollectWithClient() public > > > Key: FLINK-20871 > URL: https://issues.apache.org/jira/browse/FLINK-20871 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in > favor of the `DataStream#executeAndCollect()`. However, some integration > tests (e.g. > [FileSourceTextLinesITCase|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187]) > need the `DataStream#executeAndCollectWithClient` API to get JobClient to > cancel the job after collected required output for unbounded source test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20871) Make DataStream#executeAndCollectWithClient() public
[ https://issues.apache.org/jira/browse/FLINK-20871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-20871: --- Description: Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in favor of the `DataStream#executeAndCollect()`. However, some integration tests (e.g. [FileSourceTextLinesITCase|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187]) need the `DataStream#executeAndCollectWithClient` API to get JobClient to cancel the job after collected required output for unbounded source test. (was: Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in favor of the `DataStream#executeAndCollect()`. However, some integration tests (e.g. [FileSourceTextLinesITCase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187)) need the `DataStream#executeAndCollectWithClient` API to get JobClient to cancel the job after collected required output for unbounded source test.) > Make DataStream#executeAndCollectWithClient() public > > > Key: FLINK-20871 > URL: https://issues.apache.org/jira/browse/FLINK-20871 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in > favor of the `DataStream#executeAndCollect()`. However, some integration > tests (e.g. > [FileSourceTextLinesITCase|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187]) > need the `DataStream#executeAndCollectWithClient` API to get JobClient to > cancel the job after collected required output for unbounded source test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20871) Make DataStream#executeAndCollectWithClient() public
[ https://issues.apache.org/jira/browse/FLINK-20871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-20871: --- Summary: Make DataStream#executeAndCollectWithClient() public (was: Make DataStream#executeAndCollectWithClient public) > Make DataStream#executeAndCollectWithClient() public > > > Key: FLINK-20871 > URL: https://issues.apache.org/jira/browse/FLINK-20871 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in > favor of the `DataStream#executeAndCollect()`. However, some integration > tests (e.g. > [FileSourceTextLinesITCase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187)) > need the `DataStream#executeAndCollectWithClient` API to get JobClient to > cancel the job after collected required output for unbounded source test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20871) Make DataStream#executeAndCollectWithClient public
Steven Zhen Wu created FLINK-20871: -- Summary: Make DataStream#executeAndCollectWithClient public Key: FLINK-20871 URL: https://issues.apache.org/jira/browse/FLINK-20871 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.12.0 Reporter: Steven Zhen Wu Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in favor of the `DataStream#executeAndCollect()`. However, some integration tests (e.g. [FileSourceTextLinesITCase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187)) need the `DataStream#executeAndCollectWithClient` API to get JobClient to cancel the job after collected required output for unbounded source test. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20575) flink application failed to restore from check-point
[ https://issues.apache.org/jira/browse/FLINK-20575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17248056#comment-17248056 ] Steven Zhen Wu commented on FLINK-20575: Just from personal experience. Because this is an interrupted exception, this may be a symptom of job cancellation stuck and later aborted forcefully. Maybe check if there is any other exception before this among all TMs. > flink application failed to restore from check-point > > > Key: FLINK-20575 > URL: https://issues.apache.org/jira/browse/FLINK-20575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.9.1 >Reporter: Yu Yang >Priority: Major > > Our flink application failed to restore from a check-point due to > com.amazonaws.AbortedException (we use s3a file system). Initially we > thought that the s3 file had some issue. It turned out that we can download > the s3 file fine. Any insights on this issue will be very welcome. > > 2020-12-11 07:02:40,018 ERROR > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - > Caught unexpected exception. > java.io.InterruptedIOException: getFileStatus on > s3a://mybucket/prod/checkpoints/u/tango/910d2ff2b2c7e01e99a9588d11385e92/shared/f245da83-fc01-424d-9719-d48b99a1ed35: > org.apache.flink.fs.s3base.shaded.com.amazonaws.AbortedException: > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2187) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.open(HadoopFileSystem.java:120) > at > org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.open(HadoopFileSystem.java:37) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:127) > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109) > at > org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50) > at > java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) > at > org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) > at > java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640) > at > java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858) > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83) > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:66) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreDBInstanceFromStateHandle(RocksDBIncrementalRestoreOperation.java:406) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithRescaling(RocksDBIncrementalRestoreOperation.java:294) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:146) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > at >
[jira] [Commented] (FLINK-20390) Programmatic access to the back-pressure
[ https://issues.apache.org/jira/browse/FLINK-20390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17239806#comment-17239806 ] Steven Zhen Wu commented on FLINK-20390: not sure what is the input/source. If it is Kafka, would Kafka consumer lag (either in the number of msgs or in the wall clock time) be a good trigger. if it is lagging too much, it means that the job can't keep up with the input load. Then start to drop/sample msgs. > Programmatic access to the back-pressure > > > Key: FLINK-20390 > URL: https://issues.apache.org/jira/browse/FLINK-20390 > Project: Flink > Issue Type: New Feature > Components: API / Core >Reporter: Gaël Renoux >Priority: Major > > It would be useful to access the back-pressure monitoring from within > functions. > Here is our use case: we have a real-time Flink job, which takes decisions > based on input data. Sometimes, we have traffic spikes on the input and the > decisions process cannot processe records fast enough. Back-pressure starts > mounting, all the way back to the Source. What we want to do is to start > dropping records in this case, because it's better to make decisions based on > just a sample of the data rather than accumulate too much lag. > Right now, the only way is to have a filter with a hard-limit on the number > of records per-interval-of-time, and to drop records once we are over this > limit. However, this requires a lot of tuning to find out what the correct > limit is, especially since it may depend on the nature of the inputs (some > decisions take longer to make than others). It's also heavily dependent on > the buffers: the limit needs to be low enough that all records that pass the > limit can fit in the downstream buffers, or the back-pressure will will go > back past the filtering task and we're back to square one. Finally, it's not > very resilient to change: whenever we scale the infrastructure up, we need to > redo the whole tuning thing. > With programmatic access to the back-pressure, we could simply start dropping > records based on its current level. No tuning, and adjusted to the actual > issue. For performance, I assume it would be better if it reused the existing > back-pressure monitoring mechanism, rather than looking directly into the > buffer. A sampling of the back-pressure should be enough, and if more > precision is needed you can simply change the existing back-pressure > configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20114) Test Kafka Source based on the new Source API
[ https://issues.apache.org/jira/browse/FLINK-20114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233762#comment-17233762 ] Steven Zhen Wu commented on FLINK-20114: [~rmetzger] sorry. it might be difficult. We were having some production issues last week that we need to address in the next 2 weeks. We do plan to test 1.12 as soon as we can though. > Test Kafka Source based on the new Source API > -- > > Key: FLINK-20114 > URL: https://issues.apache.org/jira/browse/FLINK-20114 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 1.12.0 >Reporter: Robert Metzger >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 1.12.0 > > > Feature introduced in https://issues.apache.org/jira/browse/FLINK-18323 > > [General Information about the Flink 1.12 release > testing|https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing] > When testing a feature, consider the following aspects: > - Is the documentation easy to understand > - Are the error messages, log messages, APIs etc. easy to understand > - Is the feature working as expected under normal conditions > - Is the feature working / failing as expected with invalid input, induced > errors etc. > If you find a problem during testing, please file a ticket > (Priority=Critical; Fix Version = 1.12.0), and link it in this testing ticket. > During the testing, and once you are finished, please write a short summary > of all things you have tested. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233755#comment-17233755 ] Steven Zhen Wu edited comment on FLINK-20174 at 11/17/20, 5:23 PM: --- I tried the change of setting up InputFiles map for DeleteFilter per FileScanTask. I was wrong earlier. `TestFlinkInputFormatReaderDeletes` works fine after [the change|https://github.com/stevenzwu/iceberg/pull/4]. So I guess the only reason left is the throughput benefit from combining small files into a decent sized CombinedScanTask, which is still an important reason. was (Author: stevenz3wu): I tried the change of setting up InputFiles map for DeleteFilter per FileScanTask. I was wrong earlier. It actually works fine. You can see the change here. https://github.com/stevenzwu/iceberg/pull/4 So I guess the only reason left is the throughput benefit from combining small files into a decent sized CombinedScanTask, which is still an important reason. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233755#comment-17233755 ] Steven Zhen Wu commented on FLINK-20174: I tried the change of setting up InputFiles map for DeleteFilter per FileScanTask. I was wrong earlier. It actually works fine. You can see the change here. https://github.com/stevenzwu/iceberg/pull/4 So I guess the only reason left is the throughput benefit from combining small files into a decent sized CombinedScanTask, which is still an important reason. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233260#comment-17233260 ] Steven Zhen Wu edited comment on FLINK-20174 at 11/17/20, 5:20 AM: --- [~lzljs3620320] thanks a lot for sharing your thoughts. Regarding the hostname, it would be a derived information from the path of Iceberg DataFile. How to extract the hostname depends on the file system. I would image LocalityAwareSplitAssigner probably needs to take a HostnameExtractor function to extract hostname from IcebergSourceSplit. I am wondering if hostname should be a constructor arg for IcebergSourceSplit. Regarding the fine-grained split, here are my concerns of splitting CombinedScanTask into fine-grained FileScanTasks. * In the first try of PoC, I tried flapMap of CombinedScanTask into CombinedScanTasks (each with a single FileScanTask). If I remember correctly, DeleteFilter doesn't work in this case. DataIterator creates the InputFile map for the whole CombinedScanTask. Maybe there is a valid reason for that. I can double check on that with a unit test. * One of the main reasons of having CombinedScanTask is to combine small files/splits into a decent size. Because readers pull one split at a time, avoiding small splits is good for throughput. We can mitigate this by breaking CombinedScanTask into individual FileScanTasks. But that would not work with checkpointing. was (Author: stevenz3wu): [~lzljs3620320] thanks a lot for sharing your thoughts. Regarding the hostname, it would be a derived information from the path of Iceberg DataFile. How to extract the hostname depends on the file system. I would image LocalityAwareSplitAssigner probably needs to take a HostnameExtractor function to extract hostname from IcebergSourceSplit. I am wondering if hostname should be a constructor arg for IcebergSourceSplit. Regarding the fine-grained split, here are my concerns of splitting CombinedScanTask into fine-grained FileScanTasks. * In the first try of PoC, I tried flapMap of CombinedScanTask into CombinedScanTasks (each with a single FileScanTask). If I remember correctly, DeleteFilter doesn't work in this case. DataIterator creates the InputFile map for the whole CombinedScanTask. Maybe there is a valid reason for that. I can double check on that with a unit test. * One of the main reasons of having CombinedScanTask is to combine small files/splits into a decent size. Because readers pull one split at a time, avoiding small splits is good for throughput. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233260#comment-17233260 ] Steven Zhen Wu commented on FLINK-20174: [~lzljs3620320] thanks a lot for sharing your thoughts. Regarding the hostname, it would be a derived information from the path of Iceberg DataFile. How to extract the hostname depends on the file system. I would image LocalityAwareSplitAssigner probably needs to take a HostnameExtractor function to extract hostname from IcebergSourceSplit. I am wondering if hostname should be a constructor arg for IcebergSourceSplit. Regarding the fine-grained split, here are my concerns of splitting CombinedScanTask into fine-grained FileScanTasks. * In the first try of PoC, I tried flapMap of CombinedScanTask into CombinedScanTasks (each with a single FileScanTask). If I remember correctly, DeleteFilter doesn't work in this case. DataIterator creates the InputFile map for the whole CombinedScanTask. Maybe there is a valid reason for that. I can double check on that with a unit test. * One of the main reasons of having CombinedScanTask is to combine small files/splits into a decent size. Because readers pull one split at a time, avoiding small splits is good for throughput. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-20174: --- Description: Right now, BulkFormat has the generic `SpitT` type extending from `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat interface as [~lzljs3620320] suggested. This allows Iceberg source to take advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy to submit a PR. Since it is a breaking change, maybe we can only add it to master branch after 1.12 release branch is cut? The other related question is the two `createReader` and `restoreReader` APIs. I understand the motivation. I am just wondering if the separation is necessary. if the SplitT has the CheckpointedLocation, the seek operation can be handled internal to `createReader`. We can also define an abstract `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the `SourceSplit`. was: Right now, BulkFormat has the generic `SpitT` type extending from `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat interface as [~lzljs3620320] suggested. This allows Iceberg source to take advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy to submit a PR. Although there is a question if it is ok to make the API change after 1.12.0 code freeze? The other related question is the two `createReader` and `restoreReader` APIs. I understand the motivation. I am just wondering if the separation is necessary. if the SplitT has the CheckpointedLocation, the seek operation can be handled internal to `createReader`. We can also define an abstract `BaseFileSourceSplit` that only adds a `CheckpointedPosition` getter API. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Since it is a breaking change, maybe we can only add it to > master branch after 1.12 release branch is cut? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `FileSourceSplitBase` that adds a `getCheckpointedPosition` API to the > `SourceSplit`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20174) Make BulkFormat more extensible
[ https://issues.apache.org/jira/browse/FLINK-20174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-20174: --- Description: Right now, BulkFormat has the generic `SpitT` type extending from `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat interface as [~lzljs3620320] suggested. This allows Iceberg source to take advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy to submit a PR. Although there is a question if it is ok to make the API change after 1.12.0 code freeze? The other related question is the two `createReader` and `restoreReader` APIs. I understand the motivation. I am just wondering if the separation is necessary. if the SplitT has the CheckpointedLocation, the seek operation can be handled internal to `createReader`. We can also define an abstract `BaseFileSourceSplit` that only adds a `CheckpointedPosition` getter API. was: Right now, BulkFormat has the generic `SpitT` type extending from `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat interface as [~lzljs3620320] suggested. This allows Iceberg source to take advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy to submit a PR. Although there is a question if it is ok to make the API change after 1.12.0 code freeze? The other related question is the two `createReader` and `restoreReader` APIs. I understand the motivation. I am just wondering if the separation is necessary. if the SplitT has the CheckpointedLocation, the seek operation can be handled internal to `createReader`. > Make BulkFormat more extensible > --- > > Key: FLINK-20174 > URL: https://issues.apache.org/jira/browse/FLINK-20174 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.12.0 >Reporter: Steven Zhen Wu >Priority: Major > > Right now, BulkFormat has the generic `SpitT` type extending from > `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type > extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to > extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat > interface as [~lzljs3620320] suggested. This allows Iceberg source to take > advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. > [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy > to submit a PR. Although there is a question if it is ok to make the API > change after 1.12.0 code freeze? > The other related question is the two `createReader` and `restoreReader` > APIs. I understand the motivation. I am just wondering if the separation is > necessary. if the SplitT has the CheckpointedLocation, the seek operation can > be handled internal to `createReader`. We can also define an abstract > `BaseFileSourceSplit` that only adds a `CheckpointedPosition` getter API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20174) Make BulkFormat more extensible
Steven Zhen Wu created FLINK-20174: -- Summary: Make BulkFormat more extensible Key: FLINK-20174 URL: https://issues.apache.org/jira/browse/FLINK-20174 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.12.0 Reporter: Steven Zhen Wu Right now, BulkFormat has the generic `SpitT` type extending from `FileSourceSplit`. We can make BulkFormat taking the generic `SplitT` type extending from `SourceSplit`. This way, IcebergSourceSplit doesn't have to extend from `FileSourceSplit` and Iceberg source can reuse this BulkFormat interface as [~lzljs3620320] suggested. This allows Iceberg source to take advantages high-performant `ParquetVectorizedInputFormat` provided by Flink. [~sewen] [~lzljs3620320] if you are onboard with the change, I would be happy to submit a PR. Although there is a question if it is ok to make the API change after 1.12.0 code freeze? The other related question is the two `createReader` and `restoreReader` APIs. I understand the motivation. I am just wondering if the separation is necessary. if the SplitT has the CheckpointedLocation, the seek operation can be handled internal to `createReader`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19802) Let BulkFormat createReader and restoreReader methods accept Splits directly
[ https://issues.apache.org/jira/browse/FLINK-19802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228338#comment-17228338 ] Steven Zhen Wu commented on FLINK-19802: [~sewen] sorry, I didn't mean that we need this before the 1.12.0 release. Yes, currently I am also thinking about just reusing some components like BulkFormat/reader. > Let BulkFormat createReader and restoreReader methods accept Splits directly > > > Key: FLINK-19802 > URL: https://issues.apache.org/jira/browse/FLINK-19802 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > To support sources where the splits communicate additional information, the > BulkFormats should accept a generic split type, instead of path/offset/length > from the splits directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19802) Let BulkFormat createReader and restoreReader methods accept Splits directly
[ https://issues.apache.org/jira/browse/FLINK-19802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17227644#comment-17227644 ] Steven Zhen Wu commented on FLINK-19802: [~sewen][~lzljs3620320] to make the `BulkFormat` truly generic and reusable by Iceberg source, can we avoid `SplitT` extending from `FileSourceSplit`? just leaves it as pure generic type `SplitT`. I think `IcebergSourceSplit` shouldn't extend from `FileSourceSplit` , because it contains many fields not meaningful for IcebergSourceSplit (e.g. filePath, offset, length etc.). Iceberg already captured those info in its data structure. > Let BulkFormat createReader and restoreReader methods accept Splits directly > > > Key: FLINK-19802 > URL: https://issues.apache.org/jira/browse/FLINK-19802 > Project: Flink > Issue Type: Sub-task > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > To support sources where the splits communicate additional information, the > BulkFormats should accept a generic split type, instead of path/offset/length > from the splits directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.runInCoordinatorThread(Runnable)
[ https://issues.apache.org/jira/browse/FLINK-19934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19934: --- Summary: [FLIP-27 source] add new API: SplitEnumeratorContext.runInCoordinatorThread(Runnable) (was: [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)) > [FLIP-27 source] add new API: > SplitEnumeratorContext.runInCoordinatorThread(Runnable) > - > > Key: FLINK-19934 > URL: https://issues.apache.org/jira/browse/FLINK-19934 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: Steven Zhen Wu >Priority: Major > Labels: pull-request-available > > Here is the motivation use case. We are implementing event-time alignment > across sources in Iceberg source. Basically, each Iceberg source/enumerator > tracks its watermark using min/max timestamps captures in the column stats of > the data files. > When the watermark from another source advances, notified source/enumerator > can try `assignSplits` as constraints may be satisfied now. This callback is > initiated from the coordinator thread from the other source. If we have > `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the > actions by enumerator and assigner are serialized by the coordinator thread. > That can avoid the need of locks. > [~becket_qin] [~sewen] what do you think? cc [~sundaram] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226740#comment-17226740 ] Steven Zhen Wu commented on FLINK-19816: [~trohrmann] thanks a lot for taking a look. forwarded the logs to you. but it seems that we may not need them > Flink restored from a wrong checkpoint (a very old one and not the last > completed one) > -- > > Key: FLINK-19816 > URL: https://issues.apache.org/jira/browse/FLINK-19816 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.11.0, 1.12.0 >Reporter: Steven Zhen Wu >Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > h2. Summary > Upon failure, it seems that Flink didn't restore from the last completed > checkpoint. Instead, it restored from a very old checkpoint. As a result, > Kafka offsets are invalid and caused the job to replay from the beginning as > Kafka consumer "auto.offset.reset" was set to "EARLIEST". > This is an embarrassingly parallel stateless job. Parallelism is over 1,000. > I have the full log file from jobmanager at INFO level available upon request. > h2. Sequence of events from the logs > Just before the failure, checkpoint *210768* completed. > {code} > 2020-10-25 02:35:05,970 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > [jobmanager-future-thread-5] - Completed checkpoint 210768 for job > 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). > {code} > During restart, somehow it decided to restore from a very old checkpoint > *203531*. > {code:java} > 2020-10-25 02:36:03,301 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. > 2020-10-25 02:36:03,302 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [cluster-io-thread-5] - Recover all persisted job graphs. > 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem > [cluster-io-thread-25] - Deleting path: > s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 > 2020-10-25 02:36:03,307 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [cluster-io-thread-5] - Trying to recover job with job id > 233b4938179c06974e4535ac8a868675. > 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem > [cluster-io-thread-25] - Deleting path: > s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd > ... > 2020-10-25 02:36:03,427 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > [flink-akka.actor.default-dispatcher-82003] - Recovering checkpoints from > ZooKeeper. > 2020-10-25 02:36:03,432 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > [flink-akka.actor.default-dispatcher-82003] - Found 0 checkpoints in > ZooKeeper. > 2020-10-25 02:36:03,432 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore > [flink-akka.actor.default-dispatcher-82003] - Trying to fetch 0 checkpoints > from storage. > 2020-10-25 02:36:03,432 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > [flink-akka.actor.default-dispatcher-82003] - Starting job > 233b4938179c06974e4535ac8a868675 from savepoint > s3:///checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata > () > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226460#comment-17226460 ] Steven Zhen Wu edited comment on FLINK-19816 at 11/5/20, 6:10 AM: -- This happened again for the same job in production. I noticed both failures started with zookeeper failure. Here are some observations on the sequence of events. My hypothesis is that the race condition / interactions between recovering from zk failure and failure-rate restart-strategy caused this problem of restoring a wrong and very old checkpoint. [~trohrmann] any comment? 1. initially, there were some problems with zookeeper that caused the job to fail {code} 2020-10-25 02:35:59,266 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d 2020-10-25 02:35:59,268 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d, closing socket connection and attempting reconnect 2020-10-25 02:35:59,282 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3://us-west-2.spaas.prod/checkpoints/r7E1/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4 535ac8a868675/chk-210758/16d4e138-4199-4fd2-a014-4b394189f72b 2020-10-25 02:35:59,369 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [main-EventThread] - State change: SUSPENDED {code} 2. This job is configured with `restart-strategy=failure-rate`. and there are enough task restarts to trigger the terminal condition canRestart() to return false. This should eventually lead the Flink job to halt/terminal state. {code} 2020-10-25 02:35:59,641 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-81991] - Job clapp-avro-nontvui (233b4938179c06974e4535ac8a868675) switched from state FAILING to FA ILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=180,backoffTimeMS=3,maxFailuresPerInterval=20) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:203) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:508) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1725) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:435) at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:352) at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216) at
[jira] [Comment Edited] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226460#comment-17226460 ] Steven Zhen Wu edited comment on FLINK-19816 at 11/5/20, 3:53 AM: -- This happened again for the same job in production. I noticed both failures started zookeeper failure. Here are some observations on the sequence of events. My hypothesis is that the race condition / interactions between recovering from zk failure and failure-rate restart-strategy caused this problem of restoring a wrong and very old checkpoint. [~trohrmann] any comment? 1. initially, there were some problems with zookeeper that caused the job to fail {code} 2020-10-25 02:35:59,266 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d 2020-10-25 02:35:59,268 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d, closing socket connection and attempting reconnect 2020-10-25 02:35:59,282 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3://us-west-2.spaas.prod/checkpoints/r7E1/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4 535ac8a868675/chk-210758/16d4e138-4199-4fd2-a014-4b394189f72b 2020-10-25 02:35:59,369 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [main-EventThread] - State change: SUSPENDED {code} 2. This job is configured with `restart-strategy=failure-rate`. and there are enough task restarts to trigger the terminal condition canRestart() to return false. This should eventually lead the Flink job to halt/terminal state. {code} 2020-10-25 02:35:59,641 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-81991] - Job clapp-avro-nontvui (233b4938179c06974e4535ac8a868675) switched from state FAILING to FA ILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=180,backoffTimeMS=3,maxFailuresPerInterval=20) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:203) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:508) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1725) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:435) at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:352) at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216) at
[jira] [Commented] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17226460#comment-17226460 ] Steven Zhen Wu commented on FLINK-19816: This happened again for the same job in production. I noticed both failures started zookeeper failure. Here are some observations on the sequence of events. My hypothesis is that the race condition / interactions between recovering from zk failure and failure-rate restart-strategy caused this problem of restoring a wrong and very old checkpoint. [~trohrmann] 1. initially, there were some problems with zookeeper that caused the job to fail {code} 2020-10-25 02:35:59,266 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d 2020-10-25 02:35:59,268 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [main-SendThread(ip-100-81-150-64.us-west-2.compute.internal:2181)] - Client session timed out, have not heard from server in 26676 ms for sessionid 0x363850d0d034d9d, closing socket connection and attempting reconnect 2020-10-25 02:35:59,282 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3://us-west-2.spaas.prod/checkpoints/r7E1/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4 535ac8a868675/chk-210758/16d4e138-4199-4fd2-a014-4b394189f72b 2020-10-25 02:35:59,369 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [main-EventThread] - State change: SUSPENDED {code} 2. This job is configured with `restart-strategy=failure-rate`. and there are enough task restarts to trigger the terminal condition canRestart() to return false. This should eventually lead the Flink job to halt/terminal state. {code} 2020-10-25 02:35:59,641 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-81991] - Job clapp-avro-nontvui (233b4938179c06974e4535ac8a868675) switched from state FAILING to FA ILED. org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=180,backoffTimeMS=3,maxFailuresPerInterval=20) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:203) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:508) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1725) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:435) at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:352) at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:216) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:514) at
[jira] [Comment Edited] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)
[ https://issues.apache.org/jira/browse/FLINK-19934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17225572#comment-17225572 ] Steven Zhen Wu edited comment on FLINK-19934 at 11/3/20, 5:46 PM: -- [~sewen] we thought about that. the difference is that the no-op callable will be executed in the IO worker thread pool first, where threads may be tied up for a significantly long time (e.g. split planning can take dozens of seconds). After the previous no-op callable stage is done, the handler function got executed in the coordinator thread. The potential long delay in the IO thread pool step is what we are trying to avoid was (Author: stevenz3wu): [~sewen] we thought about that. the difference is that the no-op callable will be executed in the IO worker thread pool first, where threads may be tied up for a significantly long time (e.g. split planning can take dozens of seconds). After the previous no-op callable stage is done, the handler function got executed in the coordinator pool. The potential long delay in the IO thread pool step is what we are trying to avoid > [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable) > -- > > Key: FLINK-19934 > URL: https://issues.apache.org/jira/browse/FLINK-19934 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: Steven Zhen Wu >Priority: Major > > Here is the motivation use case. We are implementing event-time alignment > across sources in Iceberg source. Basically, each Iceberg source/enumerator > tracks its watermark using min/max timestamps captures in the column stats of > the data files. > When the watermark from another source advances, notified source/enumerator > can try `assignSplits` as constraints may be satisfied now. This callback is > initiated from the coordinator thread from the other source. If we have > `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the > actions by enumerator and assigner are serialized by the coordinator thread. > That can avoid the need of locks. > [~becket_qin] [~sewen] what do you think? cc [~sundaram] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)
[ https://issues.apache.org/jira/browse/FLINK-19934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17225572#comment-17225572 ] Steven Zhen Wu edited comment on FLINK-19934 at 11/3/20, 5:43 PM: -- [~sewen] we thought about that. the difference is that the no-op callable will be executed in the IO worker thread pool first, where threads may be tied up for a significantly long time (e.g. split planning can take dozens of seconds). After the previous no-op callable stage is done, the handler function got executed in the coordinator pool. The potential long delay in the IO thread pool step is what we are trying to avoid was (Author: stevenz3wu): [~sewen] we thought about that. the difference is that the no-op callable will be executed in the IO worker thread pool first, where threads may be tied up for a significantly long time (e.g. split planning can take dozens of seconds). Then the handler function got executed in the coordinator pool. The potential long delay in the IO thread pool step is what we are trying to avoid > [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable) > -- > > Key: FLINK-19934 > URL: https://issues.apache.org/jira/browse/FLINK-19934 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: Steven Zhen Wu >Priority: Major > > Here is the motivation use case. We are implementing event-time alignment > across sources in Iceberg source. Basically, each Iceberg source/enumerator > tracks its watermark using min/max timestamps captures in the column stats of > the data files. > When the watermark from another source advances, notified source/enumerator > can try `assignSplits` as constraints may be satisfied now. This callback is > initiated from the coordinator thread from the other source. If we have > `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the > actions by enumerator and assigner are serialized by the coordinator thread. > That can avoid the need of locks. > [~becket_qin] [~sewen] what do you think? cc [~sundaram] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)
[ https://issues.apache.org/jira/browse/FLINK-19934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17225572#comment-17225572 ] Steven Zhen Wu commented on FLINK-19934: [~sewen] we thought about that. the difference is that the no-op callable will be executed in the IO worker thread pool first, where threads may be tied up for a significantly long time (e.g. split planning can take dozens of seconds). Then the handler function got executed in the coordinator pool. The potential long delay in the IO thread pool step is what we are trying to avoid > [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable) > -- > > Key: FLINK-19934 > URL: https://issues.apache.org/jira/browse/FLINK-19934 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: Steven Zhen Wu >Priority: Major > > Here is the motivation use case. We are implementing event-time alignment > across sources in Iceberg source. Basically, each Iceberg source/enumerator > tracks its watermark using min/max timestamps captures in the column stats of > the data files. > When the watermark from another source advances, notified source/enumerator > can try `assignSplits` as constraints may be satisfied now. This callback is > initiated from the coordinator thread from the other source. If we have > `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the > actions by enumerator and assigner are serialized by the coordinator thread. > That can avoid the need of locks. > [~becket_qin] [~sewen] what do you think? cc [~sundaram] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)
[ https://issues.apache.org/jira/browse/FLINK-19934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19934: --- Description: Here is the motivation use case. We are implementing event-time alignment across sources in Iceberg source. Basically, each Iceberg source/enumerator tracks its watermark using min/max timestamps captures in the column stats of the data files. When the watermark from another source advances, notified source/enumerator can try `assignSplits` as constraints may be satisfied now. This callback is initiated from the coordinator thread from the other source. If we have `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the actions by enumerator and assigner are serialized by the coordinator thread. That can avoid the need of locks. [~becket_qin] [~sewen] what do you think? cc [~sundaram] was: Here is the motivation use case. We are implementing event-time alignment across sources in Iceberg source. Basically, each Iceberg source/enumerator tracks its watermark using min/max timestamps captures in the column stats of the data files. When the watermark from another source advances, notified source/enumerator can try `assignSplits` as constraints may be satisfied now. This callback is initiated from the coordinator thread from the other source. If we have `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the actions by enumerator and assigner are serialized by the coordinator thread. That can avoid the need of locks. [~becket_qin] [~sewen] what do you think? > [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable) > -- > > Key: FLINK-19934 > URL: https://issues.apache.org/jira/browse/FLINK-19934 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.11.2 >Reporter: Steven Zhen Wu >Priority: Major > > Here is the motivation use case. We are implementing event-time alignment > across sources in Iceberg source. Basically, each Iceberg source/enumerator > tracks its watermark using min/max timestamps captures in the column stats of > the data files. > When the watermark from another source advances, notified source/enumerator > can try `assignSplits` as constraints may be satisfied now. This callback is > initiated from the coordinator thread from the other source. If we have > `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the > actions by enumerator and assigner are serialized by the coordinator thread. > That can avoid the need of locks. > [~becket_qin] [~sewen] what do you think? cc [~sundaram] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19934) [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable)
Steven Zhen Wu created FLINK-19934: -- Summary: [FLIP-27 source] add new API: SplitEnumeratorContext.execute(Runnable) Key: FLINK-19934 URL: https://issues.apache.org/jira/browse/FLINK-19934 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 1.11.2 Reporter: Steven Zhen Wu Here is the motivation use case. We are implementing event-time alignment across sources in Iceberg source. Basically, each Iceberg source/enumerator tracks its watermark using min/max timestamps captures in the column stats of the data files. When the watermark from another source advances, notified source/enumerator can try `assignSplits` as constraints may be satisfied now. This callback is initiated from the coordinator thread from the other source. If we have `SplitEnumeratorContext.execute(Runnable r)` API, we can ensure that all the actions by enumerator and assigner are serialized by the coordinator thread. That can avoid the need of locks. [~becket_qin] [~sewen] what do you think? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19816: --- Description: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. I have the full log file from jobmanager at INFO level available upon request. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from a very old checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd ... 2020-10-25 02:36:03,427 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Recovering checkpoints from ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Found 0 checkpoints in ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Trying to fetch 0 checkpoints from storage. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [flink-akka.actor.default-dispatcher-82003] - Starting job 233b4938179c06974e4535ac8a868675 from savepoint s3:///checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata () {code} was: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. I have the full log file from jobmanager at INFO level. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from a very old checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path:
[jira] [Comment Edited] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221102#comment-17221102 ] Steven Zhen Wu edited comment on FLINK-19799 at 10/27/20, 3:17 AM: --- We just went through a similar exercise with the [Iceberg source PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource< T, SplitAssignerStateT extends SplitAssignerState, SplitAssignerT extends SplitAssigner, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} was (Author: stevenz3wu): We just went through a similar exercise with the [Iceberg source PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221102#comment-17221102 ] Steven Zhen Wu edited comment on FLINK-19799 at 10/27/20, 3:17 AM: --- We just went through a similar exercise with the [Iceberg source PoC|https://github.com/stevenzwu/iceberg/pull/2/files]. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} was (Author: stevenz3wu): We just went through a similar exercise with the Iceberg source PoC. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19799) Make FileSource extensible
[ https://issues.apache.org/jira/browse/FLINK-19799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17221102#comment-17221102 ] Steven Zhen Wu commented on FLINK-19799: We just went through a similar exercise with the Iceberg source PoC. We want to make assigner pluggable (no order/locale guarantee, some ordering guarantee, local aware etc.). Different assigner may have different state type for checkpoint. That is why we have to add generic types for assigner state and serializer too. We did make IcebergSource generic {code} public class IcebergSource, SplitAssignerStateSerializerT extends SplitAssignerStateSerializer> {code} We simplified the construction in builder. {code} public static Builder useSimpleAssigner(TableLoader tableLoader) { SimpleSplitAssignerFactory assignerFactory = new SimpleSplitAssignerFactory(); return new Builder<>(tableLoader, assignerFactory); } {code} The end result is still simple for users if they don't need to keep a reference to the IcebergSource object. {code} final DataStream stream = env.fromSource( IcebergSource.useSimpleAssigner(tableLoader()) .iteratorFactory(new RowDataIteratorFactory()) .config(config) .scanContext(scanContext) .build(), ... {code} > Make FileSource extensible > -- > > Key: FLINK-19799 > URL: https://issues.apache.org/jira/browse/FLINK-19799 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Fix For: 1.12.0 > > > The File System Source currently assumes all formats can represent their work > units as {{FileSourceSplit}}. If that is not the case, the formats cannot be > implemented using the {{FileSource}}. > We need to support extending the splits to carry additional information in > the splits, and to use that information when creating bulk readers and > handling split state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19816: --- Description: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. I have the full log file from jobmanager at INFO level. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from a very old checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd ... 2020-10-25 02:36:03,427 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Recovering checkpoints from ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Found 0 checkpoints in ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Trying to fetch 0 checkpoints from storage. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [flink-akka.actor.default-dispatcher-82003] - Starting job 233b4938179c06974e4535ac8a868675 from savepoint s3:///checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata () {code} was: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. I have the full log file from jobmanager. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from a very old checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd
[jira] [Updated] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
[ https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19816: --- Description: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. I have the full log file from jobmanager. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from a very old checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd ... 2020-10-25 02:36:03,427 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Recovering checkpoints from ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Found 0 checkpoints in ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Trying to fetch 0 checkpoints from storage. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [flink-akka.actor.default-dispatcher-82003] - Starting job 233b4938179c06974e4535ac8a868675 from savepoint s3:///checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata () {code} was: h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd ... 2020-10-25 02:36:03,427 INFO
[jira] [Created] (FLINK-19816) Flink restored from a wrong checkpoint (a very old one and not the last completed one)
Steven Zhen Wu created FLINK-19816: -- Summary: Flink restored from a wrong checkpoint (a very old one and not the last completed one) Key: FLINK-19816 URL: https://issues.apache.org/jira/browse/FLINK-19816 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Steven Zhen Wu h2. Summary Upon failure, it seems that Flink didn't restore from the last completed checkpoint. Instead, it restored from a very old checkpoint. As a result, Kafka offsets are invalid and caused the job to replay from the beginning as Kafka consumer "auto.offset.reset" was set to "EARLIEST". This is an embarrassingly parallel stateless job. Parallelism is over 1,000. h2. Sequence of events from the logs Just before the failure, checkpoint *210768* completed. {code} 2020-10-25 02:35:05,970 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms). {code} During restart, somehow it decided to restore from checkpoint *203531*. {code:java} 2020-10-25 02:36:03,301 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-3] - Start SessionDispatcherLeaderProcess. 2020-10-25 02:36:03,302 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Recover all persisted job graphs. 2020-10-25 02:36:03,304 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6 2020-10-25 02:36:03,307 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [cluster-io-thread-5] - Trying to recover job with job id 233b4938179c06974e4535ac8a868675. 2020-10-25 02:36:03,381 INFO com.netflix.bdp.s3fs.BdpS3FileSystem [cluster-io-thread-25] - Deleting path: s3:///checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd ... 2020-10-25 02:36:03,427 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Recovering checkpoints from ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Found 0 checkpoints in ZooKeeper. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore [flink-akka.actor.default-dispatcher-82003] - Trying to fetch 0 checkpoints from storage. 2020-10-25 02:36:03,432 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [flink-akka.actor.default-dispatcher-82003] - Starting job 233b4938179c06974e4535ac8a868675 from savepoint s3:///checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata () {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19698) Add close() method and onCheckpointComplete() to the Source.
[ https://issues.apache.org/jira/browse/FLINK-19698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217111#comment-17217111 ] Steven Zhen Wu edited comment on FLINK-19698 at 10/20/20, 12:59 AM: Since FLIP-27 is the unified source interface for both streaming and batch modes, what does the checkpoint method mean for batch mode? was (Author: stevenz3wu): Since FLIP-27 is the unified source interface for both streaming and batch modes, what does it the checkpoint method mean for batch mode? > Add close() method and onCheckpointComplete() to the Source. > > > Key: FLINK-19698 > URL: https://issues.apache.org/jira/browse/FLINK-19698 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.11.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > > Right now there are some caveats to the new Source API. From the > implementation of some connectors. We would like to make the following > improvements to the current Source API. > # Add the following method to the {{SplitReader}} API. > {{public void close() throws Exception;}} > This method allows the SplitReader implementations to be closed properly when > the split fetcher exits. > # Add the following method to the {{SourceReader}} API. > {{public void checkpointComplete(long checkpointId);}} > This method allows the {{SourceReader}} to take some cleanup / reporting > actions when a checkpoint has been successfully taken. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18044) Add the subtask index information to the SourceReaderContext.
[ https://issues.apache.org/jira/browse/FLINK-18044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217115#comment-17217115 ] Steven Zhen Wu commented on FLINK-18044: I agree with Stephan that code should avoid explicit assumptions about the parallelism and subtask index. But I also see the value of exposing subtask index for non-essential purpose. e.g. * right now, Kafka source adds a random number as `client.id` suffix. A subtask index is the more appropriate alternative. * logging with subtask index can help with troubleshooting. > Add the subtask index information to the SourceReaderContext. > - > > Key: FLINK-18044 > URL: https://issues.apache.org/jira/browse/FLINK-18044 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Common >Reporter: Jiangjie Qin >Priority: Major > Labels: pull-request-available > > It is useful for the `SourceReader` to retrieve its subtask id. For example, > Kafka readers can create a consumer with proper client id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19698) Add close() method and onCheckpointComplete() to the Source.
[ https://issues.apache.org/jira/browse/FLINK-19698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217111#comment-17217111 ] Steven Zhen Wu commented on FLINK-19698: Since FLIP-27 is the unified source interface for both streaming and batch modes, what does it the checkpoint method mean for batch mode? > Add close() method and onCheckpointComplete() to the Source. > > > Key: FLINK-19698 > URL: https://issues.apache.org/jira/browse/FLINK-19698 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.11.2 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > > Right now there are some caveats to the new Source API. From the > implementation of some connectors. We would like to make the following > improvements to the current Source API. > # Add the following method to the {{SplitReader}} API. > {{public void close() throws Exception;}} > This method allows the SplitReader implementations to be closed properly when > the split fetcher exits. > # Add the following method to the {{SourceReader}} API. > {{public void checkpointComplete(long checkpointId);}} > This method allows the {{SourceReader}} to take some cleanup / reporting > actions when a checkpoint has been successfully taken. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-19401) Job stuck in restart loop due to excessive checkpoint recoveries which block the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208437#comment-17208437 ] Steven Zhen Wu edited comment on FLINK-19401 at 10/6/20, 12:42 AM: --- [~roman_khachatryan] I don't know if repeated checkpoint recovery is the root/main cause or not. [~trohrmann] identified two problems during his investigation. This is one of the identified problem. Regarding the logs related to Titus, please ignore them. They are just noise. We haven't cleaned up our logs yet. was (Author: stevenz3wu): [~roman_khachatryan] I don't know if repeated checkpoint recovery is the root/main cause or not. [~trohrmann] identified two problems during his investigation. This is one of the identified problem. Regarding the logs related to Titusm please ignore them. They are just noise. We haven't cleaned up our logs yet. > Job stuck in restart loop due to excessive checkpoint recoveries which block > the JobMaster > -- > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.1, 1.11.2 >Reporter: Steven Zhen Wu >Priority: Critical > Fix For: 1.12.0, 1.10.3, 1.11.3 > > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at >
[jira] [Commented] (FLINK-19401) Job stuck in restart loop due to excessive checkpoint recoveries which block the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17208437#comment-17208437 ] Steven Zhen Wu commented on FLINK-19401: [~roman_khachatryan] I don't know if repeated checkpoint recovery is the root/main cause or not. [~trohrmann] identified two problems during his investigation. This is one of the identified problem. Regarding the logs related to Titusm please ignore them. They are just noise. We haven't cleaned up our logs yet. > Job stuck in restart loop due to excessive checkpoint recoveries which block > the JobMaster > -- > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.1, 1.11.2 >Reporter: Steven Zhen Wu >Priority: Critical > Fix For: 1.12.0, 1.10.3, 1.11.3 > > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113) > at >
[jira] [Commented] (FLINK-19401) Job stuck in restart loop due to excessive checkpoint recoveries which block the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17204732#comment-17204732 ] Steven Zhen Wu commented on FLINK-19401: [~trohrmann] just to clarify, do you mean operator chain caused the repeated checkpoint recoveries? i.e. if an operator chain has 3 operators, we are repeating checkpoint recoveries 3 times no matter the 3 operators has state or not. In this job, only two Kafka sources and the co-process functions have state. It has 3 operator chains as Till described (two operator chains for source, one operator chain for co-process+sink). > Job stuck in restart loop due to excessive checkpoint recoveries which block > the JobMaster > -- > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.1, 1.11.2 >Reporter: Steven Zhen Wu >Priority: Critical > Fix For: 1.12.0, 1.10.3, 1.11.3 > > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) > at >
[jira] [Commented] (FLINK-19401) Job stuck in restart loop due to excessive checkpoint recoveries which block the JobMaster
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17204031#comment-17204031 ] Steven Zhen Wu commented on FLINK-19401: [~trohrmann] thanks a lot for looking into the problem. Just a small clarification on the job, it is a stream join job with keyBy and co-process functions. > Job stuck in restart loop due to excessive checkpoint recoveries which block > the JobMaster > -- > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.10.1, 1.11.2 >Reporter: Steven Zhen Wu >Priority: Critical > Fix For: 1.12.0, 1.10.3, 1.11.3 > > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113) > at > org.apache.flink.runtime.executiongraph.SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:115) > at >
[jira] [Comment Edited] (FLINK-19401) Job stuck in restart loop due to "Could not find registered job manager"
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203298#comment-17203298 ] Steven Zhen Wu edited comment on FLINK-19401 at 9/29/20, 12:25 AM: --- [~trohrmann] emailed you the JM logs with INFO level logging. It is not easily reproducible, maybe it happens once every 1-2 weeks for a high-parallelism (~1,500) and large state (TBs) job. If DEBUG is required, we can try to enable the DEBUG level logging and wait for it to happen again. was (Author: stevenz3wu): [~trohrmann] emailed you the JM logs with INFO level logging. It is not easily reproducible, maybe it happens once every 1-2 weeks for a high-parallelism (~1,500) and large state (TBs) job. If DEBUG is required, we can try to enable the DEBUG level logging and wait for it happen again. > Job stuck in restart loop due to "Could not find registered job manager" > > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.1 >Reporter: Steven Zhen Wu >Priority: Major > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) > at >
[jira] [Commented] (FLINK-19401) Job stuck in restart loop due to "Could not find registered job manager"
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203298#comment-17203298 ] Steven Zhen Wu commented on FLINK-19401: [~trohrmann] emailed you the JM logs with INFO level logging. It is not easily reproducible, maybe it happens once every 1-2 weeks for a high-parallelism (~1,500) and large state (TBs) job. If DEBUG is required, we can try to enable the DEBUG level logging and wait for it happen again. > Job stuck in restart loop due to "Could not find registered job manager" > > > Key: FLINK-19401 > URL: https://issues.apache.org/jira/browse/FLINK-19401 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.10.1 >Reporter: Steven Zhen Wu >Priority: Major > > Flink job sometimes got into a restart loop for many hours and can't recover > until redeployed. We had some issue with Kafka that initially caused the job > to restart. > Below is the first of the many exceptions for "ResourceManagerException: > Could not find registered job manager" error. > {code} > 2020-09-19 00:03:31,614 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP > rofile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,615 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl > [flink-akka.actor.default-dispatcher-35973] - Requesting new slot > [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile > ResourceProfile{UNKNOWN} from resource manager. > 2020-09-19 00:03:31,620 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> > k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> > Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED > to FAILED. > java.util.concurrent.CompletionException: > java.util.concurrent.CompletionException: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > No pooled slot available and request to ResourceManager for new slot failed > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) > at > org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) > at > org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) > at > org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113) > at > org.apache.flink.runtime.executiongraph.SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:115) > at >
[jira] [Updated] (FLINK-19401) Job stuck in restart loop due to "Could not find registered job manager"
[ https://issues.apache.org/jira/browse/FLINK-19401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-19401: --- Description: Flink job sometimes got into a restart loop for many hours and can't recover until redeployed. We had some issue with Kafka that initially caused the job to restart. Below is the first of the many exceptions for "ResourceManagerException: Could not find registered job manager" error. {code} 2020-09-19 00:03:31,614 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP rofile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,620 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED to FAILED. java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113) at org.apache.flink.runtime.executiongraph.SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:115) at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.allocateSlotsFor(DefaultExecutionSlotAllocator.java:102) at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(DefaultScheduler.java:342) at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlotsAndDeploy(DefaultScheduler.java:311) at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy.allocateSlotsAndDeploy(EagerSchedulingStrategy.java:76) at org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy.restartTasks(EagerSchedulingStrategy.java:57) at
[jira] [Created] (FLINK-19401) Job stuck in restart loop due to "Could not find registered job manager"
Steven Zhen Wu created FLINK-19401: -- Summary: Job stuck in restart loop due to "Could not find registered job manager" Key: FLINK-19401 URL: https://issues.apache.org/jira/browse/FLINK-19401 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.10.1 Reporter: Steven Zhen Wu Flink job sometimes got into a restart loop for many hours and can't recover until redeployed. We had some issue with Kafka that initially caused the job to restart. Below is the first of the many exceptions for "ResourceManagerException: Could not find registered job manager" error. {code} 2020-09-19 00:03:31,614 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{171f1df017dab3a42c032abd07908b9b}] and profile ResourceP rofile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{cc7d136c4ce1f32285edd4928e3ab2e2}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{024c8a48dafaf8f07c49dd4320d5cc94}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,615 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [flink-akka.actor.default-dispatcher-35973] - Requesting new slot [SlotRequestId{a591eda805b3081ad2767f5641d0db06}] and profile ResourceProfile{UNKNOWN} from resource manager. 2020-09-19 00:03:31,620 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [flink-akka.actor.default-dispatcher-35973] - Source: k2-csevpc -> k2-csevpcRaw -> (vhsPlaybackEvents -> Flat Map, merchImpressionsClientLog -> Flat Map) (56/640) (1b0d3dd1f19890886ff373a3f08809e8) switched from SCHEDULED to FAILED. java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: No pooled slot available and request to ResourceManager for new slot failed at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.lambda$new$0(SlotSharingManager.java:433) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$forward$21(FutureUtils.java:1065) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at org.apache.flink.runtime.concurrent.FutureUtils.forward(FutureUtils.java:1063) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager.createRootSlot(SlotSharingManager.java:155) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateMultiTaskSlot(SchedulerImpl.java:511) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSharedSlot(SchedulerImpl.java:311) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.internalAllocateSlot(SchedulerImpl.java:160) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlotInternal(SchedulerImpl.java:143) at org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl.allocateSlot(SchedulerImpl.java:113) at org.apache.flink.runtime.executiongraph.SlotProviderStrategy$NormalSlotProviderStrategy.allocateSlot(SlotProviderStrategy.java:115) at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.lambda$allocateSlotsFor$0(DefaultExecutionSlotAllocator.java:104) at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) at org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator.allocateSlotsFor(DefaultExecutionSlotAllocator.java:102) at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlots(DefaultScheduler.java:342) at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlotsAndDeploy(DefaultScheduler.java:311) at
[jira] [Commented] (FLINK-19016) Checksum mismatch when restore from RocksDB
[ https://issues.apache.org/jira/browse/FLINK-19016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199720#comment-17199720 ] Steven Zhen Wu commented on FLINK-19016: [~sewen] we didn't dig deeper and we don't know how to reproduce it. But your hypothesis sounds reasonable to me. > Checksum mismatch when restore from RocksDB > --- > > Key: FLINK-19016 > URL: https://issues.apache.org/jira/browse/FLINK-19016 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.1 >Reporter: Jiayi Liao >Priority: Major > > The error stack is shown below: > {code:java} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedMapBundleOperator_44cfc1ca74b40bb44eed1f38f72b3ea9_(71/300) from any of > the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > ... 6 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:580) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ... 8 more > Caused by: java.io.IOException: Error while opening RocksDB instance. > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74) > at > org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:277) > ... 12 more > Caused by: org.rocksdb.RocksDBException: checksum mismatch > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:286) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66) > ... 18 more > {code} > The machine goes down because of hardware problem, then the job cannot > restart successfully anymore. After digging a little bit, I found that > RocksDB in Flink uses sync instead of fsync to synchronized the data with the > disk. With sync operation, the RocksDB cannot guarantee that the current > in-progress file can be persisted on disk in takeDBNativeCheckpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19016) Checksum mismatch when restore from RocksDB
[ https://issues.apache.org/jira/browse/FLINK-19016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197779#comment-17197779 ] Steven Zhen Wu commented on FLINK-19016: We have seen a similar problem when the TM disk is full. Checkpoint is completed and some corrupted/incomplete RocksDB file got uploaded. maybe a bug somewhere that didn't bubble the failure up. > Checksum mismatch when restore from RocksDB > --- > > Key: FLINK-19016 > URL: https://issues.apache.org/jira/browse/FLINK-19016 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.11.1 >Reporter: Jiayi Liao >Priority: Major > > The error stack is shown below: > {code:java} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > KeyedMapBundleOperator_44cfc1ca74b40bb44eed1f38f72b3ea9_(71/300) from any of > the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > ... 6 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:333) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:580) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ... 8 more > Caused by: java.io.IOException: Error while opening RocksDB instance. > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:74) > at > org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:131) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:214) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:277) > ... 12 more > Caused by: org.rocksdb.RocksDBException: checksum mismatch > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:286) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:66) > ... 18 more > {code} > The machine goes down because of hardware problem, then the job cannot > restart successfully anymore. After digging a little bit, I found that > RocksDB in Flink uses sync instead of fsync to synchronized the data with the > disk. With sync operation, the RocksDB cannot guarantee that the current > in-progress file can be persisted on disk in takeDBNativeCheckpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16867) Simplify default timeout configuration
[ https://issues.apache.org/jira/browse/FLINK-16867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17164519#comment-17164519 ] Steven Zhen Wu commented on FLINK-16867: I should clarify that web.timeout becomes irrelevant to us for job submission via REST API. It seems that only client.timeout matters for that particular code path of submitting job. > Simplify default timeout configuration > -- > > Key: FLINK-16867 > URL: https://issues.apache.org/jira/browse/FLINK-16867 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration, Runtime / Coordination >Affects Versions: 1.9.2, 1.10.0, 1.11.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.12.0 > > > At the moment, Flink has several timeout options: > * {{akka.ask.timeout}}: Timeout for intra cluster RPCs (JM <\-> RM <\-> TE) > * {{web.timeout}}: Timeout for RPCs between REST handlers and RM, JM, TE > At the moment, these values are separately configured. This requires the user > to know about both configuration options and that Flink has multiple timeout > values. > In order to simplify setups I would suggest that {{web.timeout}} defaults to > {{akka.ask.timeout}}, if {{web.timeout}} has not been explicitly configured. > This has the benefits that the user only need to know about a single timeout > value which is applied cluster wide. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16867) Simplify default timeout configuration
[ https://issues.apache.org/jira/browse/FLINK-16867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163667#comment-17163667 ] Steven Zhen Wu commented on FLINK-16867: [~trohrmann] just to follow up on your comment on the other jira: https://issues.apache.org/jira/browse/FLINK-11143?focusedCommentId=17161779=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17161779 Now with 1.11, there is a 3rd `client.timeout` and all three have different defaults. * web.timeout is 600,000 milli-seconds * akka.ask.timeout is `10 s` * client.timeout is `1 min` With 1.11, web.timeout becomes irrelevant for REST API of job submission. only `client.timeout` matters. > Simplify default timeout configuration > -- > > Key: FLINK-16867 > URL: https://issues.apache.org/jira/browse/FLINK-16867 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration, Runtime / Coordination >Affects Versions: 1.9.2, 1.10.0, 1.11.0 >Reporter: Till Rohrmann >Priority: Minor > Fix For: 1.12.0 > > > At the moment, Flink has several timeout options: > * {{akka.ask.timeout}}: Timeout for intra cluster RPCs (JM <\-> RM <\-> TE) > * {{web.timeout}}: Timeout for RPCs between REST handlers and RM, JM, TE > At the moment, these values are separately configured. This requires the user > to know about both configuration options and that Flink has multiple timeout > values. > In order to simplify setups I would suggest that {{web.timeout}} defaults to > {{akka.ask.timeout}}, if {{web.timeout}} has not been explicitly configured. > This has the benefits that the user only need to know about a single timeout > value which is applied cluster wide. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162965#comment-17162965 ] Steven Zhen Wu commented on FLINK-11143: [~trohrmann] thanks a lot for looking into it. increasing `client.timeout` helped. Yeah, it is a little confusing. At least, it would help to point it out in the release notes regarding this `web.timeout` vs `client.timeout` nuances. > AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.2, 1.10.0 >Reporter: Alex Vinnik >Priority: Critical > Attachments: flink-job-timeline.PNG > > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748)\nCaused by: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type >
[jira] [Updated] (FLINK-18572) Flink web UI doesn't display checkpoint configs like unaligned and tolerable-failed-checkpoints and
[ https://issues.apache.org/jira/browse/FLINK-18572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-18572: --- Summary: Flink web UI doesn't display checkpoint configs like unaligned and tolerable-failed-checkpoints and (was: Flink web UI doesn't display unaligned checkpoint config) > Flink web UI doesn't display checkpoint configs like unaligned and > tolerable-failed-checkpoints and > - > > Key: FLINK-18572 > URL: https://issues.apache.org/jira/browse/FLINK-18572 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.11.0 >Reporter: Steven Zhen Wu >Priority: Major > Attachments: image-2020-07-12-10-14-49-990.png > > > might be helpful to display the unaligned checkpoint boolean flag in web UI. > > !image-2020-07-12-10-14-49-990.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156363#comment-17156363 ] Steven Zhen Wu edited comment on FLINK-11143 at 7/14/20, 5:38 PM: -- [~trohrmann] I am seeing a similar problem *when trying unaligned checkpoint with 1.11.0*. The Flink job actually started fine. We didn't see this AskTimeoutException thrown during job submission in without unaligned checkpoint (1.10 or 1.11). Some more context about the app * a large-state stream join app (a few TBs) * parallelism 1,440 * number of containers: 180 * Cores per container: 12 * TM_TASK_SLOTS: 8 * akka.ask.timeout: 120 s * heartbeat.timeout: 12 * web.timeout: 6 (also tried larger values like 300,000 or 600,000 without any difference) I will send you the log files (with DEBUG level) in an email offline. Thanks a lot for your help in advance! {code:java} \"errors\":[\"Internal server error.\",\" AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.2, 1.10.0 >Reporter: Alex Vinnik >Priority: Critical > Attachments: flink-job-timeline.PNG > > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at >
[jira] [Comment Edited] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156363#comment-17156363 ] Steven Zhen Wu edited comment on FLINK-11143 at 7/14/20, 5:38 PM: -- [~trohrmann] I am seeing a similar problem *when trying unaligned checkpoint with 1.11.0*. The Flink job actually started fine. We didn't see this AskTimeoutException thrown during job submission in without unaligned checkpoint (1.10 or 1.11). Some more context about the app * a large-state stream join app (a few TBs) * parallelism 1,440 * number of containers: 180 * Cores per container: 12 * TM_TASK_SLOTS: 8 * akka.ask.timeout: 120 s * heartbeat.timeout: 12 * web.timeout: 6 (also tried larger values like 300,000 or 600,000 without any difference) I will send you the log files (with DEBUG level) in an email offline. Thanks a lot for your help in advance! {code:java} \"errors\":[\"Internal server error.\",\" AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.6.2, 1.10.0 >Reporter: Alex Vinnik >Priority: Critical > Attachments: flink-job-timeline.PNG > > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at >