[jira] [Created] (FLINK-10969) expose API or metric for total number of keys stored in state backend

2018-11-21 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-10969:
--

 Summary: expose API or metric for total number of keys stored in 
state backend
 Key: FLINK-10969
 URL: https://issues.apache.org/jira/browse/FLINK-10969
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Steven Zhen Wu


[~srichter] mentioned it might make sense to provide two versions: exact count 
and approximate count. For some state backend (likes rocksDB), it may be much 
cheaper to get approximate count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10969) expose API or metric for total number of keys stored in state backend

2018-11-21 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-10969:
---
Description: 
[~srichter] mentioned it might make sense to provide two versions: exact count 
and approximate count. For some state backend (likes rocksDB), it may be much 
cheaper to get approximate count.

exposing as metrics would be ideal.

Additionally, it will also be useful to get the total count of timers, which 
are also stored in state backend. Stefan mentioned timers are just a different 
namespace in state backend (e.g. column family in rocksDB). So it will be very 
useful if the metrics have _namespace_ tag.

  was:[~srichter] mentioned it might make sense to provide two versions: exact 
count and approximate count. For some state backend (likes rocksDB), it may be 
much cheaper to get approximate count.


> expose API or metric for total number of keys stored in state backend
> -
>
> Key: FLINK-10969
> URL: https://issues.apache.org/jira/browse/FLINK-10969
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Steven Zhen Wu
>Priority: Major
>
> [~srichter] mentioned it might make sense to provide two versions: exact 
> count and approximate count. For some state backend (likes rocksDB), it may 
> be much cheaper to get approximate count.
> exposing as metrics would be ideal.
> Additionally, it will also be useful to get the total count of timers, which 
> are also stored in state backend. Stefan mentioned timers are just a 
> different namespace in state backend (e.g. column family in rocksDB). So it 
> will be very useful if the metrics have _namespace_ tag.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10970) expose metric for total state size in terms of bytes

2018-11-21 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-10970:
--

 Summary: expose metric for total state size in terms of bytes
 Key: FLINK-10970
 URL: https://issues.apache.org/jira/browse/FLINK-10970
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Reporter: Steven Zhen Wu


With incremental checkpoint, checkpoint size only captures the delta size. It 
will be very useful if there is another metric that captures total state size. 
even an approximate number would be super useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10452) Expose Additional Metrics to Reason about Statesize

2018-11-22 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16696156#comment-16696156
 ] 

Steven Zhen Wu commented on FLINK-10452:


I saw two tickets that I filed are marked as duplicate of this one. let me 
capture the asks from the two closed tickets

1) total number of keys. it might make sense to provide two versions: exact 
count and approximate count. For some state backend (likes rocksDB), it may be 
much cheaper to get approximate count. Additionally, it will also be useful to 
get the total count of timers, which are also stored in state backend. Stefan 
mentioned timers are just a different namespace in state backend (e.g. column 
family in rocksDB). So it will be very useful if the metrics have _namespace_ 
tag.

2) With incremental checkpoint, checkpoint size only captures the delta size. 
It will be very useful if there is another metric that captures total state 
size. even an approximate number would be super useful.

> Expose Additional Metrics to Reason about Statesize
> ---
>
> Key: FLINK-10452
> URL: https://issues.apache.org/jira/browse/FLINK-10452
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Konstantin Knauf
>Assignee: vinoyang
>Priority: Major
>
> For monitoring purposes it would be helpful, if Flink could expose metrics 
> about the number of keys/windows for each registered keyed state. 
> Open Questions:
> * One Metric per Registered State? One Metric per KeyedOperator?
> * Performance Impact (should this be default behavior?)
> * Possible to know the number of windows during runtime?
> * RocksDB only gives you an estimate of the number keys. Would be nice if we 
> could derive the exact number inside Flink. This would also help in sizing 
> the RocksDB instances and estimated their memory footprint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects

2018-11-24 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697931#comment-16697931
 ] 

Steven Zhen Wu commented on FLINK-7883:
---

We would love to see this happening. it is the "graceful" shutdown that we need 
to reduce/minimize duplicates if we are going to enable aggressive/frequent 
rescale events. otherwise, we are going to see frequent and significant 
duplicates.

> Make savepoints atomic with respect to state and side effects
> -
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Antoine Philippot
>Priority: Major
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-35384) Expose metrics group to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-35384:
--

 Summary: Expose metrics group to custom Partitioner
 Key: FLINK-35384
 URL: https://issues.apache.org/jira/browse/FLINK-35384
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.9.4
Reporter: Steven Zhen Wu


I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
```
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
```

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
```

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
```

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
```




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35384) Expose metrics group to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Description: 
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
{code}
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
{code}

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
{code}

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
{code}

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
{code}
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
{code}


  was:
I am trying to implement a custom range partitioner in the Flink Iceberg sink. 
Want to publish some counter metrics for certain scenarios. This is like the 
network metrics exposed in `TaskIOMetricGroup`.

We can implement the range partitioner using the public interface from 
`DataStream`. 
```
public  DataStream partitionCustom(
Partitioner partitioner, KeySelector keySelector)
```

We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
`CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
pubic `Partitioner` interface, where we can implement the custom range 
partitioner.

`Partitioner` interface is a functional interface today. we can add a new 
default `setup` method without breaking the backward compatibility.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
*default void setup(TaskIOMetricGroup metrics) {}*
int partition(K key, int numPartitions);
}
```

I know public interface requires a FLIP process. will do that if the community 
agree with this feature request.

Personally, `numPartitions` should be passed in the `setup` method too. But it 
is a breaking change that is NOT worth the benefit right now.
```
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
int partition(K key);
}
```

That would be similar to `StreamPartitioner#setup()` method that we would need 
to modify for passing the metrics group.
```
@Internal
public abstract class StreamPartitioner
implements ChannelSelector>>, 
Serializable {
@Override
public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
this.numberOfChannels = numberOfChannels;
}
```



> Expose metrics group to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partiti

[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Summary: Expose TaskIOMetricGroup to custom Partitioner  (was: Expose 
metrics group to custom Partitioner)

> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847116#comment-17847116
 ] 

Steven Zhen Wu commented on FLINK-35384:


one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
   TaskIOMetricGroup metrics();
}
}
{code}



> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner

2024-05-16 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847116#comment-17847116
 ] 

Steven Zhen Wu edited comment on FLINK-35384 at 5/17/24 12:06 AM:
--

one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
TaskIOMetricGroup metrics();
}
}
{code}




was (Author: stevenz3wu):
one potential risk of this type of API is that it is not extensible. if we want 
to pass in another arg to partitioner, we need to break the compatibility or 
add a new method.
{code}
default void setup(TaskIOMetricGroup metrics) {}
{code}

Maybe we can move to the context model that is widely used in Flink
{code}
@Public
@FunctionalInterface
public interface Partitioner extends java.io.Serializable, Function {
int partition(K key, int numPartitions);

void init(Context context) {}

interface Context {
int numberOfChannels();
   TaskIOMetricGroup metrics();
}
}
{code}



> Expose TaskIOMetricGroup to custom Partitioner
> --
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35384) Expose TaskIOMetricGroup to custom Partitioner via init Context

2024-05-16 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-35384:
---
Summary: Expose TaskIOMetricGroup to custom Partitioner via init Context  
(was: Expose TaskIOMetricGroup to custom Partitioner)

> Expose TaskIOMetricGroup to custom Partitioner via init Context
> ---
>
> Key: FLINK-35384
> URL: https://issues.apache.org/jira/browse/FLINK-35384
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.4
>Reporter: Steven Zhen Wu
>Priority: Major
>
> I am trying to implement a custom range partitioner in the Flink Iceberg 
> sink. Want to publish some counter metrics for certain scenarios. This is 
> like the network metrics exposed in `TaskIOMetricGroup`.
> We can implement the range partitioner using the public interface from 
> `DataStream`. 
> {code}
> public  DataStream partitionCustom(
> Partitioner partitioner, KeySelector keySelector)
> {code}
> We can pass the `TaskIOMetricGroup` to the `StreamPartitioner` that 
> `CustomPartitionerWrapper` extends from. `CustomPartitionerWrapper` wraps the 
> pubic `Partitioner` interface, where we can implement the custom range 
> partitioner.
> `Partitioner` interface is a functional interface today. we can add a new 
> default `setup` method without breaking the backward compatibility.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> *default void setup(TaskIOMetricGroup metrics) {}*
> int partition(K key, int numPartitions);
> }
> {code}
> I know public interface requires a FLIP process. will do that if the 
> community agree with this feature request.
> Personally, `numPartitions` should be passed in the `setup` method too. But 
> it is a breaking change that is NOT worth the benefit right now.
> {code}
> @Public
> @FunctionalInterface
> public interface Partitioner extends java.io.Serializable, Function {
> public void setup(int numPartitions, TaskIOMetricGroup metrics) {}
> int partition(K key);
> }
> {code}
> That would be similar to `StreamPartitioner#setup()` method that we would 
> need to modify for passing the metrics group.
> {code}
> @Internal
> public abstract class StreamPartitioner
> implements ChannelSelector>>, 
> Serializable {
> @Override
> public void setup(int numberOfChannels, TaskIOMetricGroup metrics) {
> this.numberOfChannels = numberOfChannels;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-9693:
-

 Summary: possible memory link in jobmanager retaining archived 
checkpoints
 Key: FLINK-9693
 URL: https://issues.apache.org/jira/browse/FLINK-9693
 Project: Flink
  Issue Type: Bug
  Components: JobManager, State Backends, Checkpointing
 Environment: !image.png!!image (1).png!
Reporter: Steven Zhen Wu
 Attachments: image (1).png, image.png

First, some context about the job
* Flink 1.4.1
* embarrassingly parallel: all operators are chained together
* parallelism is over 1,000
* stateless except for Kafka source operators. checkpoint size is 8.4 MB.
* set "state.backend.fs.memory-threshold" so that only jobmanager writes to S3 
to checkpoint
* internal checkpoint with 10 checkpoints retained in history
 
Summary of the observations
* 41,567 ExecutionVertex objects retained 9+ GB of memory
* Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Attachment: (was: image.png)

> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
>
> First, some context about the job
>  * Flink 1.4.1
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Attachment: (was: image (1).png)

> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
>
> First, some context about the job
>  * Flink 1.4.1
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Description: 
First, some context about the job
 * Flink 1.4.1
 * embarrassingly parallel: all operators are chained together
 * parallelism is over 1,000
 * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
 * set "state.backend.fs.memory-threshold" so that only jobmanager writes to S3 
to checkpoint
 * internal checkpoint with 10 checkpoints retained in history

 

Summary of the observations
 * 41,567 ExecutionVertex objects retained 9+ GB of memory
 * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
source operator

  was:
First, some context about the job
* Flink 1.4.1
* embarrassingly parallel: all operators are chained together
* parallelism is over 1,000
* stateless except for Kafka source operators. checkpoint size is 8.4 MB.
* set "state.backend.fs.memory-threshold" so that only jobmanager writes to S3 
to checkpoint
* internal checkpoint with 10 checkpoints retained in history
 
Summary of the observations
* 41,567 ExecutionVertex objects retained 9+ GB of memory
* Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
source operator


> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
>
> First, some context about the job
>  * Flink 1.4.1
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Attachment: 41K_ExecutionVertex_objs_retained_9GB.png

> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Attachment: ExecutionVertexZoomIn.png

> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) possible memory link in jobmanager retaining archived checkpoints

2018-06-29 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Description: 
First, some context about the job
 * Flink 1.4.1
 * stand-alone deployment mode
 * embarrassingly parallel: all operators are chained together
 * parallelism is over 1,000
 * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
 * set "state.backend.fs.memory-threshold" so that only jobmanager writes to S3 
to checkpoint
 * internal checkpoint with 10 checkpoints retained in history

 

Summary of the observations
 * 41,567 ExecutionVertex objects retained 9+ GB of memory
 * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
source operator

  was:
First, some context about the job
 * Flink 1.4.1
 * embarrassingly parallel: all operators are chained together
 * parallelism is over 1,000
 * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
 * set "state.backend.fs.memory-threshold" so that only jobmanager writes to S3 
to checkpoint
 * internal checkpoint with 10 checkpoints retained in history

 

Summary of the observations
 * 41,567 ExecutionVertex objects retained 9+ GB of memory
 * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
source operator


> possible memory link in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-09 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16537514#comment-16537514
 ] 

Steven Zhen Wu commented on FLINK-9693:
---

[~till.rohrmann] is it possible to generate a patch for 1.4? it doesn't seem 
straightforward to backport PR 6251 to 1.4 branch.

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.1, 1.6.0
>
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-11-04 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-10774:
--

 Summary: connection leak when partition discovery is disabled and 
open throws exception
 Key: FLINK-10774
 URL: https://issues.apache.org/jira/browse/FLINK-10774
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.6.2, 1.5.5, 1.4.2
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu


Here is the scenario to reproduce the issue
 * partition discovery is disabled
 * open method throws an exception (e.g. when broker SSL authorization denies 
request)

In this scenario, run method won't be executed. As a result, 
_partitionDiscoverer.close()_ won't be called. that caused the connection leak, 
because KafkaConsumer is initialized but not closed. That has caused outage 
that brought down our Kafka cluster, when a high-parallelism job got into a 
restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10774) connection leak when partition discovery is disabled and open throws exception

2018-11-04 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674471#comment-16674471
 ] 

Steven Zhen Wu commented on FLINK-10774:


[~srichter]  FYI. I will submit a patch soon

> connection leak when partition discovery is disabled and open throws exception
> --
>
> Key: FLINK-10774
> URL: https://issues.apache.org/jira/browse/FLINK-10774
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2, 1.5.5, 1.6.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>
> Here is the scenario to reproduce the issue
>  * partition discovery is disabled
>  * open method throws an exception (e.g. when broker SSL authorization denies 
> request)
> In this scenario, run method won't be executed. As a result, 
> _partitionDiscoverer.close()_ won't be called. that caused the connection 
> leak, because KafkaConsumer is initialized but not closed. That has caused 
> outage that brought down our Kafka cluster, when a high-parallelism job got 
> into a restart/failure loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10360) support timeout in savepoint REST api

2018-09-17 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-10360:
--

 Summary: support timeout in savepoint REST api
 Key: FLINK-10360
 URL: https://issues.apache.org/jira/browse/FLINK-10360
 Project: Flink
  Issue Type: Improvement
  Components: REST, State Backends, Checkpointing
Reporter: Steven Zhen Wu


right now, savepoint share the same timeout config as checkpoint. With 
incremental checkpoint, we just need to configure the timeout to be reasonable 
for the small delta. With savepoint, it is a full snapshot and maybe the 
checkpoint timeout is too small for savepoint.

It will bring more flexibility if the savepoint REST api supports an additional 
timeout param, and apply it for actual savepoint timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10360) support timeout in savepoint REST api

2018-09-18 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-10360:
---
Description: 
right now, savepoint share the same timeout config as checkpoint. With 
incremental checkpoint, we just need to configure the timeout to be reasonable 
for the small delta. With savepoint, it is a full snapshot and maybe the 
checkpoint timeout is too small for savepoint.
 * incremental checkpoint size ranges from 100 GB - 400 GB, and took 1 mins - 4 
mins
 * full savepoint is ~4.5 TB and took ~25 mins

It will bring more flexibility if the savepoint REST api supports an additional 
timeout param, and apply it for actual savepoint timeout.

  was:
right now, savepoint share the same timeout config as checkpoint. With 
incremental checkpoint, we just need to configure the timeout to be reasonable 
for the small delta. With savepoint, it is a full snapshot and maybe the 
checkpoint timeout is too small for savepoint.

It will bring more flexibility if the savepoint REST api supports an additional 
timeout param, and apply it for actual savepoint timeout.


> support timeout in savepoint REST api
> -
>
> Key: FLINK-10360
> URL: https://issues.apache.org/jira/browse/FLINK-10360
> Project: Flink
>  Issue Type: Improvement
>  Components: REST, State Backends, Checkpointing
>Reporter: Steven Zhen Wu
>Priority: Major
>
> right now, savepoint share the same timeout config as checkpoint. With 
> incremental checkpoint, we just need to configure the timeout to be 
> reasonable for the small delta. With savepoint, it is a full snapshot and 
> maybe the checkpoint timeout is too small for savepoint.
>  * incremental checkpoint size ranges from 100 GB - 400 GB, and took 1 mins - 
> 4 mins
>  * full savepoint is ~4.5 TB and took ~25 mins
> It will bring more flexibility if the savepoint REST api supports an 
> additional timeout param, and apply it for actual savepoint timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8042) retry individual failover-strategy for some time first before reverting to full job restart

2017-11-09 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-8042:
-

 Summary: retry individual failover-strategy for some time first 
before reverting to full job restart
 Key: FLINK-8042
 URL: https://issues.apache.org/jira/browse/FLINK-8042
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.3.2
Reporter: Steven Zhen Wu


Let's we will a taskmanager node. When Flink tries to attempt fine grained 
recovery and fails replacement taskmanager node didn't come back in time, it 
reverts to full job restart. 

Stephan and Till was suggesting that Flink can/should retry fine grained 
recovery for some time before giving up and reverting full job restart



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8043) increment job restart metric when fine grained recovery reverted to full job restart

2017-11-09 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-8043:
-

 Summary: increment job restart metric when fine grained recovery 
reverted to full job restart
 Key: FLINK-8043
 URL: https://issues.apache.org/jira/browse/FLINK-8043
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager
Affects Versions: 1.3.2
Reporter: Steven Zhen Wu


When fine grained recovery failed (e.g. due to not enough taskmager slots when 
replacement taskmanager node didn't come back in time), Flink will revert to 
full job restart. In this case, it should also increment "job restart" metric



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8043) change fullRestarts (for fine grained recovery) from guage to counter

2017-12-24 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-8043:
--
Description: 
Fine grained recovery publish fullRestarts as guage, which is not suitable for 
threshold based alerting. Usually we would alert like "fullRestarts > 0 happens 
10 times in last 15 minutes".

In comparison, "task_failures" is published as counter.

  was:When fine grained recovery failed (e.g. due to not enough taskmager slots 
when replacement taskmanager node didn't come back in time), Flink will revert 
to full job restart. In this case, it should also increment "job restart" metric

Summary: change fullRestarts (for fine grained recovery) from guage to 
counter  (was: increment job restart metric when fine grained recovery reverted 
to full job restart)

> change fullRestarts (for fine grained recovery) from guage to counter
> -
>
> Key: FLINK-8043
> URL: https://issues.apache.org/jira/browse/FLINK-8043
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.3.2
>Reporter: Steven Zhen Wu
>
> Fine grained recovery publish fullRestarts as guage, which is not suitable 
> for threshold based alerting. Usually we would alert like "fullRestarts > 0 
> happens 10 times in last 15 minutes".
> In comparison, "task_failures" is published as counter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8043) change fullRestarts (for fine grained recovery) from guage to counter

2018-01-03 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310567#comment-16310567
 ] 

Steven Zhen Wu commented on FLINK-8043:
---

[~trohrm...@apache.org] any objection/comment on changing this metric from 
guage to counter?

also metric naming seems inconsistent too. "fullRestarts" v.s. "task_failures".

> change fullRestarts (for fine grained recovery) from guage to counter
> -
>
> Key: FLINK-8043
> URL: https://issues.apache.org/jira/browse/FLINK-8043
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.3.2
>Reporter: Steven Zhen Wu
>
> Fine grained recovery publish fullRestarts as guage, which is not suitable 
> for threshold based alerting. Usually we would alert like "fullRestarts > 0 
> happens 10 times in last 15 minutes".
> In comparison, "task_failures" is published as counter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-9061) add entropy to s3 path for better scalability

2018-05-02 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu reassigned FLINK-9061:
-

Assignee: Indrajit Roychoudhury
 Summary: add entropy to s3 path for better scalability  (was: S3 
checkpoint data not partitioned well -- causes errors and poor performance)

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu reopened FLINK-9693:
---

Till, I cherry picked your fix for 1.4 branch. we are still seeing the memory 
leak issue. will attach another screenshot 

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 41K_ExecutionVertex_objs_retained_9GB.png, 
> ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-9693:
--
Attachment: 20180725_jm_mem_leak.png

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556325#comment-16556325
 ] 

Steven Zhen Wu commented on FLINK-9693:
---

One more observation. we are seeing this issue right after the jobmanager node 
got killed and replaced. however, it is not reproducible when I trying to kill 
the jobmanager when job is healthy

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556325#comment-16556325
 ] 

Steven Zhen Wu edited comment on FLINK-9693 at 7/25/18 9:56 PM:


One more observation. we are seeing this issue right after the jobmanager node 
got killed and replaced. however, it is not reproducible when I trying to kill 
the jobmanager when job is healthy. so still don't know what exactly conditions 
triggered the issue


was (Author: stevenz3wu):
One more observation. we are seeing this issue right after the jobmanager node 
got killed and replaced. however, it is not reproducible when I trying to kill 
the jobmanager when job is healthy

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556325#comment-16556325
 ] 

Steven Zhen Wu edited comment on FLINK-9693 at 7/25/18 11:23 PM:
-

We can actually reproduce the issue by killing jobmanager node for very large 
jobs, like parallelism over 1,000. This issue starts to appear when replacement 
jobmanager node came up. 


was (Author: stevenz3wu):
One more observation. we are seeing this issue right after the jobmanager node 
got killed and replaced. however, it is not reproducible when I trying to kill 
the jobmanager when job is healthy. so still don't know what exactly conditions 
triggered the issue

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-25 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556325#comment-16556325
 ] 

Steven Zhen Wu edited comment on FLINK-9693 at 7/26/18 5:15 AM:


We can actually reproduce the issue by killing jobmanager node for very large 
jobs, like parallelism over 1,000. This issue starts to appear when replacement 
jobmanager node came up. 

Another observation is that ~10 GB memory leak seems to happen very quickly 
(like < a few mins).


was (Author: stevenz3wu):
We can actually reproduce the issue by killing jobmanager node for very large 
jobs, like parallelism over 1,000. This issue starts to appear when replacement 
jobmanager node came up. 

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-07-26 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556325#comment-16556325
 ] 

Steven Zhen Wu edited comment on FLINK-9693 at 7/27/18 12:57 AM:
-

We can actually reproduce the issue by killing jobmanager node for very large 
jobs, like parallelism over 1,000. This issue starts to appear when replacement 
jobmanager node came up. 

Another observation is that ~10 GB memory leak seems to happen very quickly 
(like < a few mins).

A few words about our setup
 * standalone cluster
 * HA mode using zookeeper
 * single jobmanager. we also tried two jobmanagers setup, same issue

 


was (Author: stevenz3wu):
We can actually reproduce the issue by killing jobmanager node for very large 
jobs, like parallelism over 1,000. This issue starts to appear when replacement 
jobmanager node came up. 

Another observation is that ~10 GB memory leak seems to happen very quickly 
(like < a few mins).

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.1, 1.6.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-08-11 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-9693.
-
Resolution: Fixed

close this jira since [~srichter] have addressed remaining issues in other jiras

> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.3, 1.6.1, 1.7.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11195) Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and Hadoop Configuration

2019-01-16 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744346#comment-16744346
 ] 

Steven Zhen Wu commented on FLINK-11195:


[~StephanEwen] can you take a look at this Jira and PR from [~markcho]?

> Extend AbstractS3FileSystemFactory.createHadoopFileSystem to accept URI and 
> Hadoop Configuration
> 
>
> Key: FLINK-11195
> URL: https://issues.apache.org/jira/browse/FLINK-11195
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Mark Cho
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, `createHadoopFileSystem` method does not take any parameters.
> In order to delegate FileSystem creation to Hadoop FileSystem.get method, we 
> need to pass URI and Hadoop Configuration to this abstract method.
> We use a custom version of PrestoS3FileSystem by plugging our 
> FileSystemFactory similar to `flink-filesystems/flink-s3-fs-presto` project. 
> However, we would like to delegate our FS creation to Hadoop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11196) Extend S3 EntropyInjector to use key replacement (instead of key removal) when creating checkpoint metadata files

2019-01-16 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744345#comment-16744345
 ] 

Steven Zhen Wu commented on FLINK-11196:


[~StephanEwen] can you take a look at the Jira and PR from [~markcho] ?

> Extend S3 EntropyInjector to use key replacement (instead of key removal) 
> when creating checkpoint metadata files
> -
>
> Key: FLINK-11196
> URL: https://issues.apache.org/jira/browse/FLINK-11196
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem
>Affects Versions: 1.7.0
>Reporter: Mark Cho
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We currently use S3 entropy injection when writing out checkpoint data.
> We also use external checkpoints so that we can resume from a checkpoint 
> metadata file later.
> The current implementation of S3 entropy injector makes it difficult to 
> locate the checkpoint metadata files since in the newer versions of Flink, 
> `state.checkpoints.dir` configuration controls where the metadata and state 
> files are written, instead of having two separate paths (one for metadata, 
> one for state files).
> With entropy injection, we replace the entropy marker in the path specified 
> by `state.checkpoints.dir` with entropy (for state files) or we strip out the 
> marker (for metadata files).
>  
> We need to extend the entropy injection so that we can replace the entropy 
> marker with a predictable path (instead of removing it) so that we can do a 
> prefix query for just the metadata files.
> By not using the entropy key replacement (defaults to empty string), you get 
> the same behavior as it is today (entropy marker removed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12368) add subtask index to FlinkKafkaConsumerBase logging, which can be very useful when debugging problem

2019-04-29 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-12368:
--

 Summary: add subtask index to FlinkKafkaConsumerBase logging, 
which can be very useful when debugging problem
 Key: FLINK-12368
 URL: https://issues.apache.org/jira/browse/FLINK-12368
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.8.0
Reporter: Steven Zhen Wu
Assignee: Steven Zhen Wu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12368) add subtask index to FlinkKafkaConsumerBase logging, which can be very useful when debugging problem

2019-05-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-12368.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

Stephan merged the PR

> add subtask index to FlinkKafkaConsumerBase logging, which can be very useful 
> when debugging problem
> 
>
> Key: FLINK-12368
> URL: https://issues.apache.org/jira/browse/FLINK-12368
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12619) Support TERMINATE/SUSPEND Job with Checkpoint

2019-06-06 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16857778#comment-16857778
 ] 

Steven Zhen Wu commented on FLINK-12619:


It is very useful to support optimized/incremental/light-weight snapshot 
(checkpoint or savepoint) during stop. E.g. it can help with redeployment case 
to minimize/avoid duplicates for non-exactly-once sink, where parallelism and 
state backend remained the same.

> Support TERMINATE/SUSPEND Job with Checkpoint
> -
>
> Key: FLINK-12619
> URL: https://issues.apache.org/jira/browse/FLINK-12619
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Inspired by the idea of FLINK-11458, we propose to support terminate/suspend 
> a job with checkpoint. This improvement cooperates with incremental and 
> external checkpoint features, that if checkpoint is retained and this feature 
> is configured, we will trigger a checkpoint before the job stops. It could 
> accelarate job recovery a lot since:
> 1. No source rewinding required any more.
> 2. It's much faster than taking a savepoint since incremental checkpoint is 
> enabled.
> Please note that conceptually savepoints is different from checkpoint in a 
> similar way that backups are different from recovery logs in traditional 
> database systems. So we suggest using this feature only for job recovery, 
> while stick with FLINK-11458 for the 
> upgrading/cross-cluster-job-migration/state-backend-switch cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)
Steven Zhen Wu created FLINK-12781:
--

 Summary: run job REST api doesn't return complete stack trace for 
start job failure
 Key: FLINK-12781
 URL: https://issues.apache.org/jira/browse/FLINK-12781
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: Steven Zhen Wu


We use REST api to start a job in Flink cluster.

[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run]
 

 

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 

```

{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}

```

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Description: 
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.

  was:
We use REST api to start a job in Flink cluster.

[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run]
 

 

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 

{code}

{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}

{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.


> run job REST api doesn't return complete stack trace for start job failure
> --
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. But we need the response payload to contain the full stack trace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Description: 
We use REST api to start a job in Flink cluster.

[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run]
 

 

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 

{code}

{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}

{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.

  was:
We use REST api to start a job in Flink cluster.

[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run]
 

 

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 

```

{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}

```

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.


> run job REST api doesn't return complete stack trace for start job failure
> --
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run]
>  
>  
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. But we need the response payload to contain the full stack trace.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Description: 
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. We just need the response payload to contain the full stack trace, which 
has always been an issue/fix.

  was:
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. But we need the response payload to contain the full stack trace.


> run job REST api doesn't return complete stack trace for start job failure
> --
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which has always been an issue/fix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Description: 
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. We just need the response payload to contain the full stack trace, which 
seems to have been an issue/fix since 1.5. we know 1.4 doesn't have this issue 
and correctly return full stack trace

  was:
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. We just need the response payload to contain the full stack trace, which 
has always been an issue/fix.


> run job REST api doesn't return complete stack trace for start job failure
> --
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) run job REST api doesn't return complete stack trace for start job failure

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Description: 
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. We just need the response payload to contain the full stack trace, which 
seems to have been an issue/fix since 1.5. we know 1.4 doesn't have this issue 
and correctly return full stack trace

on the jobmanager log, we only get
{code}
2019-06-07 17:42:40,136 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
occurred in REST handler: 
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
{code}

  was:
We use REST api to start a job in Flink cluster.
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run

When there is exception during job construction, the response payload doesn't 
contain the full stack trace. 
{code}
{"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
main method caused an error."]}
{code}

This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
because stack trace is completely lost now. FLINK-11134 is doing the right 
thing. We just need the response payload to contain the full stack trace, which 
seems to have been an issue/fix since 1.5. we know 1.4 doesn't have this issue 
and correctly return full stack trace


> run job REST api doesn't return complete stack trace for start job failure
> --
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace
> on the jobmanager log, we only get
> {code}
> 2019-06-07 17:42:40,136 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
> occurred in REST handler: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) REST handler should return full stack trace instead of just error msg

2019-06-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Summary: REST handler should return full stack trace instead of just error 
msg  (was: run job REST api doesn't return complete stack trace for start job 
failure)

> REST handler should return full stack trace instead of just error msg
> -
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace
> on the jobmanager log, we only get
> {code}
> 2019-06-07 17:42:40,136 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
> occurred in REST handler: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12781) REST handler should return full stack trace instead of just error msg

2019-06-08 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu reassigned FLINK-12781:
--

Assignee: Steven Zhen Wu

> REST handler should return full stack trace instead of just error msg
> -
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace
> on the jobmanager log, we only get
> {code}
> 2019-06-07 17:42:40,136 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
> occurred in REST handler: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12781) REST handler should return full stack trace instead of just error msg

2019-06-08 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-12781:
---
Affects Version/s: 1.7.2

> REST handler should return full stack trace instead of just error msg
> -
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace
> on the jobmanager log, we only get
> {code}
> 2019-06-07 17:42:40,136 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
> occurred in REST handler: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-14271) Deprecate legacy RestartPipelinedRegionStrategy

2019-10-02 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16942945#comment-16942945
 ] 

Steven Zhen Wu commented on FLINK-14271:


[~zhuzh] we don't use the region failover or 
`AdaptedRestartPipelinedRegionStrategyNG`. We do use fine-grained recovery 
feature for embarrassingly parallel jobs by setting 
`jobmanager.execution.failover-strategy: indivisual`

> Deprecate legacy RestartPipelinedRegionStrategy
> ---
>
> Key: FLINK-14271
> URL: https://issues.apache.org/jira/browse/FLINK-14271
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Priority: Minor
> Fix For: 1.10.0
>
>
> The legacy {{RestartPipelinedRegionStrategy}} has been superseded by 
> {{AdaptedRestartPipelinedRegionStrategyNG}} in Flink 1.9.
> It heavily depends on ExecutionGraph components and becomes a blocker for a 
> clean scheduler re-architecture.
> We should deprecate it for further removal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14315) NPE with JobMaster.disconnectTaskManager

2019-10-02 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-14315:
--

 Summary: NPE with JobMaster.disconnectTaskManager
 Key: FLINK-14315
 URL: https://issues.apache.org/jira/browse/FLINK-14315
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.0
Reporter: Steven Zhen Wu


There was some connection issue with zookeeper that caused the job to restart.  
But shutdown failed with this fatal NPE, which seems to cause JVM to exit

{code}
2019-10-02 16:16:19,134 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Unable to 
read additional data from server sessionid 0x16d83374c4206f8, likely server has 
clo
sed socket, closing socket connection and attempting reconnect
2019-10-02 16:16:19,234 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
  - State change: SUSPENDED
2019-10-02 16:16:19,235 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2019-10-02 16:16:19,235 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2019-10-02 16:16:19,235 WARN  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Connection to ZooKeeper suspended. The contender 
akka.tcp://flink@100.122.177.82:42043/u
ser/dispatcher no longer participates in the leader election.
2019-10-02 16:16:19,237 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint- 
http://100.122.177.82:8081 lost leadership
2019-10-02 16:16:19,237 INFO  
com.netflix.spaas.runtime.resourcemanager.TitusResourceManager  - 
ResourceManager akka.tcp://flink@100.122.177.82:42043/user/resourcemanager was 
revoked leadershi
p. Clearing fencing token.
2019-10-02 16:16:19,237 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/e4e68f2b3fc40c7008cca624b2a2bab0/job_
manager_lock.
2019-10-02 16:16:19,237 WARN  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
ZooKeeper connection SUSPENDING. Changes to the submitted job graphs are not 
monitored (tem
porarily).
2019-10-02 16:16:19,238 INFO  
org.apache.flink.runtime.jobmaster.JobManagerRunner   - JobManager for 
job ksrouter (e4e68f2b3fc40c7008cca624b2a2bab0) was revoked leadership at 
akka.tcp:
//flink@100.122.177.82:42043/user/jobmanager_0.
2019-10-02 16:16:19,239 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
2019-10-02 16:16:19,239 WARN  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Connection to ZooKeeper suspended. The contender http://100.122.177.82:8081 no 
longer pa
rticipates in the leader election.
2019-10-02 16:16:19,239 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2019-10-02 16:16:19,239 WARN  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - 
Connection to ZooKeeper suspended. The contender 
akka.tcp://flink@100.122.177.82:42043/u
ser/jobmanager_0 no longer participates in the leader election.
2019-10-02 16:16:19,239 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2019-10-02 16:16:19,239 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job ksrouter 
(e4e68f2b3fc40c7008cca624b2a2bab0) switched from state RUNNING to SUSPENDED.
org.apache.flink.util.FlinkException: JobManager is no longer the leader.
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeJobMasterLeadership(JobManagerRunner.java:391)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.lambda$revokeLeadership$5(JobManagerRunner.java:377)
at 
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
at 
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.revokeLeadership(JobManagerRunner.java:374)
at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.notLeader(ZooKeeperLeaderElectionService.java:247)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:640)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.leader.LeaderLatch$8.apply(LeaderLatch.java:636)
at 
org.apache.flink.shaded.curator.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerCo

[jira] [Created] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-03 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-14316:
--

 Summary: stuck in "Job leader ... lost leadership" error
 Key: FLINK-14316
 URL: https://issues.apache.org/jira/browse/FLINK-14316
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.7.2
Reporter: Steven Zhen Wu


This is the first exception caused restart loop. Later exceptions are the same. 
Job seems to stuck in this permanent failure state.

{code}
2019-10-03 21:42:46,159 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
SCHEDULED to FAILED.
java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
lost leadership.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-04 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16944638#comment-16944638
 ] 

Steven Zhen Wu commented on FLINK-14316:


[~trohrmann] don't know. we are just beginning rolling out of 1.9. It will take 
some time for those large-state jobs to pick up 1.9.

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-14316:
---
Attachment: FLINK-14316.tgz

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946322#comment-16946322
 ] 

Steven Zhen Wu commented on FLINK-14316:


[~trohrmann] in the uploaded tar ball, there is one TM log file. the rest are 
JM log files.

This is the line from TM log that TM thinks JM lost leadership.
```
2019-10-06 16:11:36,471 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- JobManager for 
job 3bb42eb7602c5ba25740d8360b1f0e27 with leader id 
9aa48c6a49d009f7fb287754b61d4af8 lost leadership.
```

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946322#comment-16946322
 ] 

Steven Zhen Wu edited comment on FLINK-14316 at 10/7/19 11:07 PM:
--

[~trohrmann] in the uploaded tar ball, there is one TM log file. the rest are 
JM log files.

This is the line from TM log that TM thinks JM lost leadership.
{code}
2019-10-06 16:11:36,471 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- JobManager for 
job 3bb42eb7602c5ba25740d8360b1f0e27 with leader id 
9aa48c6a49d009f7fb287754b61d4af8 lost leadership.
{code}


was (Author: stevenz3wu):
[~trohrmann] in the uploaded tar ball, there is one TM log file. the rest are 
JM log files.

This is the line from TM log that TM thinks JM lost leadership.
```
2019-10-06 16:11:36,471 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- JobManager for 
job 3bb42eb7602c5ba25740d8360b1f0e27 with leader id 
9aa48c6a49d009f7fb287754b61d4af8 lost leadership.
```

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946326#comment-16946326
 ] 

Steven Zhen Wu edited comment on FLINK-14316 at 10/7/19 11:10 PM:
--

>From TM log, I also noticed that ZooKeeperLeaderRetrievalService started, 
>stopped, then started again.
{code:java}
2019-10-06 16:07:32,509 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:35,018 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:37,546 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

 {code}


was (Author: stevenz3wu):
>From TM log, I also noticed that ZooKeeperLeaderRetrievalService started, 
>stopped, then started again.

 
{code:java}
2019-10-06 16:07:32,509 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:35,018 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:37,546 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

 {code}

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946326#comment-16946326
 ] 

Steven Zhen Wu commented on FLINK-14316:


>From TM log, I also noticed that ZooKeeperLeaderRetrievalService started, 
>stopped, then started again.

 
{code:java}
2019-10-06 16:07:32,509 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:35,018 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
...
2019-10-06 16:07:37,546 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.

 {code}

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946327#comment-16946327
 ] 

Steven Zhen Wu commented on FLINK-14316:


My colleague [~pgoyal] will attach a patch, which does seems to fix this 
specific problem.

However, we still don't know how we get here. there is no change in the 
application/Flink code. and we suddenly start to experience this problem. Maybe 
some dynamics change in the infrastructure/env that caused this problem to show 
up.

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14358) [Web UI] configuration tab for jobmanager has improper width for prop key

2019-10-09 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-14358:
--

 Summary: [Web UI] configuration tab for jobmanager has improper 
width for prop key
 Key: FLINK-14358
 URL: https://issues.apache.org/jira/browse/FLINK-14358
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.9.0
Reporter: Steven Zhen Wu
 Attachments: image-2019-10-09-16-48-30-161.png

!image-2019-10-09-16-48-30-161.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14358) [Web UI] configuration tab for jobmanager has improper width for prop key

2019-10-09 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-14358:
---
Description: 
You can see the key column shows only 2 chars which make it not readable.

 

!image-2019-10-09-16-48-30-161.png!

  was:!image-2019-10-09-16-48-30-161.png!


> [Web UI] configuration tab for jobmanager has improper width for prop key
> -
>
> Key: FLINK-14358
> URL: https://issues.apache.org/jira/browse/FLINK-14358
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.9.0
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: image-2019-10-09-16-48-30-161.png
>
>
> You can see the key column shows only 2 chars which make it not readable.
>  
> !image-2019-10-09-16-48-30-161.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14358) [Web UI] configuration tab for jobmanager has improper width for prop key

2019-10-09 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-14358:
---
Description: 
You can see the key column shows only 2 chars which make it not readable. This 
is on Chrome Version 77.0.3865.90 (Official Build) (64-bit).

 

!image-2019-10-09-16-48-30-161.png!

  was:
You can see the key column shows only 2 chars which make it not readable.

 

!image-2019-10-09-16-48-30-161.png!


> [Web UI] configuration tab for jobmanager has improper width for prop key
> -
>
> Key: FLINK-14358
> URL: https://issues.apache.org/jira/browse/FLINK-14358
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Affects Versions: 1.9.0
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: image-2019-10-09-16-48-30-161.png
>
>
> You can see the key column shows only 2 chars which make it not readable. 
> This is on Chrome Version 77.0.3865.90 (Official Build) (64-bit).
>  
> !image-2019-10-09-16-48-30-161.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2019-10-13 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16950470#comment-16950470
 ] 

Steven Zhen Wu commented on FLINK-14316:


[~trohrmann] we would love to hear your thoughts on two specific questions
 # Piyush's patch clearly fixed whatever bug that caused this issue. Do you see 
any other implication/downside of such a change? if it is good, we can create 
an official PR to upstream.
 # We still haven't been able to identify the root cause for this bug to show 
up. This job has been running stable and there is no code change. Any idea?

Some background for this job
 * 235 containers. each with 8 CPUs/slots. parallelism is 1,880. When running 
into this problem, we also tried 50 containers (with 400 parallelism) and it 
was still failing.
 * it is a large-state job (a few TBs), although we don't think it matters. we 
tried to redeploy the job with empty state and it still suffered the same 
failure loop.

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Priority: Major
> Attachments: FLINK-14316.tgz, RpcConnection.patch
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-11-18 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16977152#comment-16977152
 ] 

Steven Zhen Wu commented on FLINK-14164:


[~zhuzh] yeah. Gauge is fine

> Add a metric to show failover count regarding fine grained recovery
> ---
>
> Key: FLINK-14164
> URL: https://issues.apache.org/jira/browse/FLINK-14164
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination, Runtime / Metrics
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Previously Flink uses restart all strategy to recover jobs from failures. And 
> the metric "fullRestart" is used to show the count of failovers.
> However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
> metric only reveals how many times the entire graph has been restarted, not 
> including the number of fine grained failure recoveries.
> As many users want to build their job alerting based on failovers, I'd 
> propose to add such a new metric {{numberOfRestarts}} which also respects 
> fine grained recoveries. The metric should be a Gauge.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14943) make callback and related members protected so that user can override the callback handling

2019-11-25 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-14943:
--

 Summary: make callback and related members protected so that user 
can override the callback handling
 Key: FLINK-14943
 URL: https://issues.apache.org/jira/browse/FLINK-14943
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.9.1
Reporter: Steven Zhen Wu


FlinkKafkaProducer's `logFailuresOnly` is either log everything or fail 
everything. We want to selectively drop/skip records on certain exceptions, 
like `RecordTooLargeException`. So we want to override the callback when 
extending FlinkKafkaProducer class. Hence this request of making callback, 
asyncException, and acknowledgeMessage protected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-11843) Dispatcher fails to recover jobs if leader change happens during JobManagerRunner termination

2019-07-18 Thread Steven Zhen Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16888172#comment-16888172
 ] 

Steven Zhen Wu commented on FLINK-11843:


[~Tison] where do I email you the log?

> Dispatcher fails to recover jobs if leader change happens during 
> JobManagerRunner termination
> -
>
> Key: FLINK-11843
> URL: https://issues.apache.org/jira/browse/FLINK-11843
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0, 1.9.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.9.0, 1.10.0
>
>
> The {{Dispatcher}} fails to recover jobs if a leader change happens during 
> the {{JobManagerRunner}} termination of the previous run. The problem is that 
> we schedule the start future of the recovered {{JobGraph}} using the 
> {{MainThreadExecutor}} and additionally require that this future is completed 
> before any other recovery operation from a subsequent leadership session is 
> executed. If now the leadership changes, the {{MainThreadExecutor}} will be 
> invalidated and the scheduled future will never be completed.
> The relevant ML thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-7-1-job-stuck-in-suspended-state-td26439.html



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-14316) stuck in "Job leader ... lost leadership" error

2020-03-31 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072298#comment-17072298
 ] 

Steven Zhen Wu commented on FLINK-14316:


[~trohrmann] thanks a lot for the follow-up. we will upgrade to `1.10.1` as 
soon as it is released.

> stuck in "Job leader ... lost leadership" error
> ---
>
> Key: FLINK-14316
> URL: https://issues.apache.org/jira/browse/FLINK-14316
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
> Attachments: FLINK-14316.tgz, RpcConnection.patch
>
>
> This is the first exception caused restart loop. Later exceptions are the 
> same. Job seems to stuck in this permanent failure state.
> {code}
> 2019-10-03 21:42:46,159 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> clpevents -> device_filter -> processed_imps -> ios_processed_impression -> i
> mps_ts_assigner (449/1360) (d237f5e99b6a4a580498821473763edb) switched from 
> SCHEDULED to FAILED.
> java.lang.Exception: Job leader for job id ecb9ad9be934edf7b1a4f7b9dd6df365 
> lost leadership.
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1526)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15846) TaskExecutorResourceUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-15846:
--

 Summary: TaskExecutorResourceUtils should return config strs with 
mb as unit for better readability
 Key: FLINK-15846
 URL: https://issues.apache.org/jira/browse/FLINK-15846
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Steven Zhen Wu


With FLIP-49 being released in 1.10. The config strs returned 
`TaskExecutorResourceUtils` are using bytes as unit which are very difficult to 
read when inspecting logs. It is hard to believe anyone will ever configure 
memory using bytes as unit. mostly likely either mega-bytes or giga-bytes.

e.g. from logs.

BASH_JAVA_UTILS_EXEC_RESULT:-D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=16384b -D 
taskmanager.memory.network.min=16384b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
taskmanager.memory.task.heap.size=1479068832b -D 
taskmanager.memory.task.off-heap.size=0b 

-Xmx1613286560
 -Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-12781) REST handler should return full stack trace instead of just error msg

2020-02-02 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-12781.
--
Fix Version/s: 1.9.0
   Resolution: Fixed

> REST handler should return full stack trace instead of just error msg
> -
>
> Key: FLINK-12781
> URL: https://issues.apache.org/jira/browse/FLINK-12781
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.7.2
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> We use REST api to start a job in Flink cluster.
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-run
> When there is exception during job construction, the response payload doesn't 
> contain the full stack trace. 
> {code}
> {"errors":["org.apache.flink.client.program.ProgramInvocationException: The 
> main method caused an error."]}
> {code}
> This problem becomes more serious after FLINK-11134 got released in 1.7.2, 
> because stack trace is completely lost now. FLINK-11134 is doing the right 
> thing. We just need the response payload to contain the full stack trace, 
> which seems to have been an issue/fix since 1.5. we know 1.4 doesn't have 
> this issue and correctly return full stack trace
> on the jobmanager log, we only get
> {code}
> 2019-06-07 17:42:40,136 ERROR 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Exception 
> occurred in REST handler: 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15846) TaskExecutorResourceUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-15846:
---
Description: 
With FLIP-49 being released in 1.10. The config strs returned 
`TaskExecutorResourceUtils` are using bytes as unit which are very difficult to 
read when inspecting logs. It is hard to believe anyone will ever configure 
memory using bytes as unit. mostly likely either mega-bytes or giga-bytes.

e.g. from logs.
```
BASH_JAVA_UTILS_EXEC_RESULT:-D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=16384b -D 
taskmanager.memory.network.min=16384b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
taskmanager.memory.task.heap.size=1479068832b -D 
taskmanager.memory.task.off-heap.size=0b 
```
```
-Xmx1613286560
 -Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912
```

  was:
With FLIP-49 being released in 1.10. The config strs returned 
`TaskExecutorResourceUtils` are using bytes as unit which are very difficult to 
read when inspecting logs. It is hard to believe anyone will ever configure 
memory using bytes as unit. mostly likely either mega-bytes or giga-bytes.

e.g. from logs.

BASH_JAVA_UTILS_EXEC_RESULT:-D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=16384b -D 
taskmanager.memory.network.min=16384b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
taskmanager.memory.task.heap.size=1479068832b -D 
taskmanager.memory.task.off-heap.size=0b 

-Xmx1613286560
 -Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912


> TaskExecutorResourceUtils should return config strs with mb as unit for 
> better readability
> --
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> ```
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> ```
> ```
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15846) TaskExecutorResourceUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-15846:
---
Description: 
With FLIP-49 being released in 1.10. The config strs returned 
`TaskExecutorResourceUtils` are using bytes as unit which are very difficult to 
read when inspecting logs. It is hard to believe anyone will ever configure 
memory using bytes as unit. mostly likely either mega-bytes or giga-bytes.

e.g. from logs.

BASH_JAVA_UTILS_EXEC_RESULT:-D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=16384b -D 
taskmanager.memory.network.min=16384b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
taskmanager.memory.task.heap.size=1479068832b -D 
taskmanager.memory.task.off-heap.size=0b 


-Xmx1613286560
 -Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912


  was:
With FLIP-49 being released in 1.10. The config strs returned 
`TaskExecutorResourceUtils` are using bytes as unit which are very difficult to 
read when inspecting logs. It is hard to believe anyone will ever configure 
memory using bytes as unit. mostly likely either mega-bytes or giga-bytes.

e.g. from logs.
```
BASH_JAVA_UTILS_EXEC_RESULT:-D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=16384b -D 
taskmanager.memory.network.min=16384b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
taskmanager.memory.task.heap.size=1479068832b -D 
taskmanager.memory.task.off-heap.size=0b 
```
```
-Xmx1613286560
 -Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912
```


> TaskExecutorResourceUtils should return config strs with mb as unit for 
> better readability
> --
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15846) TaskExecutorProcessUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-15846:
---
Summary: TaskExecutorProcessUtils should return config strs with mb as unit 
for better readability  (was: TaskExecutorResourceUtils should return config 
strs with mb as unit for better readability)

> TaskExecutorProcessUtils should return config strs with mb as unit for better 
> readability
> -
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15846) TaskExecutorProcessUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028669#comment-17028669
 ] 

Steven Zhen Wu commented on FLINK-15846:


ok, if you need byte level accuracy.

But I often look at JVM args and found these lines from bash `ps` output 
difficult to read

-Xmx1613286560
-Xms1613286560
-XX:MaxDirectMemorySize=2311323648
-XX:MaxMetaspaceSize=536870912

> TaskExecutorProcessUtils should return config strs with mb as unit for better 
> readability
> -
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15846) TaskExecutorProcessUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028680#comment-17028680
 ] 

Steven Zhen Wu commented on FLINK-15846:


good to know. closing this jira.

> TaskExecutorProcessUtils should return config strs with mb as unit for better 
> readability
> -
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15846) TaskExecutorProcessUtils should return config strs with mb as unit for better readability

2020-02-02 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-15846.
--
Resolution: Invalid

> TaskExecutorProcessUtils should return config strs with mb as unit for better 
> readability
> -
>
> Key: FLINK-15846
> URL: https://issues.apache.org/jira/browse/FLINK-15846
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Reporter: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> With FLIP-49 being released in 1.10. The config strs returned 
> `TaskExecutorResourceUtils` are using bytes as unit which are very difficult 
> to read when inspecting logs. It is hard to believe anyone will ever 
> configure memory using bytes as unit. mostly likely either mega-bytes or 
> giga-bytes.
> e.g. from logs.
> BASH_JAVA_UTILS_EXEC_RESULT:-D 
> taskmanager.memory.framework.off-heap.size=2147483648b -D 
> taskmanager.memory.network.max=16384b -D 
> taskmanager.memory.network.min=16384b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=2616406869b -D taskmanager.cpu.cores=2.0 -D 
> taskmanager.memory.task.heap.size=1479068832b -D 
> taskmanager.memory.task.off-heap.size=0b 
> -Xmx1613286560
>  -Xms1613286560
> -XX:MaxDirectMemorySize=2311323648
> -XX:MaxMetaspaceSize=536870912



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15907) expose private method in Configuration so that user can extend from it

2020-02-04 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-15907:
--

 Summary: expose private method in Configuration so that user can 
extend from it
 Key: FLINK-15907
 URL: https://issues.apache.org/jira/browse/FLINK-15907
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.9.2
Reporter: Steven Zhen Wu


We use Archaius for configuration internally: 
https://github.com/Netflix/archaius

It will be nice to expose this methods as *_protected_* so that we can override 
and forward to Archaius.

{code}
private Optional getRawValue(String key) 
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15907) expose getRawValue in Configuration so that user can override it

2020-02-04 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-15907:
---
Summary: expose getRawValue in Configuration so that user can override it  
(was: expose private method in Configuration so that user can extend from it)

> expose getRawValue in Configuration so that user can override it
> 
>
> Key: FLINK-15907
> URL: https://issues.apache.org/jira/browse/FLINK-15907
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.2
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use Archaius for configuration internally: 
> https://github.com/Netflix/archaius
> It will be nice to expose this methods as *_protected_* so that we can 
> override and forward to Archaius.
> {code}
> private Optional getRawValue(String key) 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15907) expose getRawValue as protected in Configuration so that user can override it

2020-02-04 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-15907:
---
Summary: expose getRawValue as protected in Configuration so that user can 
override it  (was: expose getRawValue in Configuration so that user can 
override it)

> expose getRawValue as protected in Configuration so that user can override it
> -
>
> Key: FLINK-15907
> URL: https://issues.apache.org/jira/browse/FLINK-15907
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.2
>Reporter: Steven Zhen Wu
>Priority: Major
>
> We use Archaius for configuration internally: 
> https://github.com/Netflix/archaius
> It will be nice to expose this methods as *_protected_* so that we can 
> override and forward to Archaius.
> {code}
> private Optional getRawValue(String key) 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030901#comment-17030901
 ] 

Steven Zhen Wu commented on FLINK-15918:


quote an answer from our user, which is similar to what Thomas said.

"to detect a job that is not running. the reason to use uptime is to catch the 
case where the job is continually restarting, so it is mostly “up”, but never 
for a long time"

> Uptime Metric not reset on Job Restart
> --
>
> Key: FLINK-15918
> URL: https://issues.apache.org/jira/browse/FLINK-15918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> *Description*
> The {{uptime}} metric is not reset when the job restarts, which is a change 
> in behavior compared to Flink 1.8.
> This change of behavior exists since 1.9.0 if 
> {{jobmanager.execution.failover-strategy: region}} is configured,
> which we do in the default flink-conf.yaml.
> *Workarounds*
> Users that find this behavior problematic can set {{jobmanager.scheduler: 
> legacy}} and unset {{jobmanager.execution.failover-strategy: region}} in 
> their {{flink-conf.yaml}}
> *How to reproduce*
> trivial
> *Expected behavior*
> This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030901#comment-17030901
 ] 

Steven Zhen Wu edited comment on FLINK-15918 at 2/5/20 6:26 PM:


quote an answer from our user, which is similar to what Thomas said.

``` to detect a job that is not running. the reason to use uptime is to catch 
the case where the job is continually restarting, so it is mostly “up”, but 
never for a long time```


was (Author: stevenz3wu):
quote an answer from our user, which is similar to what Thomas said.

"to detect a job that is not running. the reason to use uptime is to catch the 
case where the job is continually restarting, so it is mostly “up”, but never 
for a long time"

> Uptime Metric not reset on Job Restart
> --
>
> Key: FLINK-15918
> URL: https://issues.apache.org/jira/browse/FLINK-15918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> *Description*
> The {{uptime}} metric is not reset when the job restarts, which is a change 
> in behavior compared to Flink 1.8.
> This change of behavior exists since 1.9.0 if 
> {{jobmanager.execution.failover-strategy: region}} is configured,
> which we do in the default flink-conf.yaml.
> *Workarounds*
> Users that find this behavior problematic can set {{jobmanager.scheduler: 
> legacy}} and unset {{jobmanager.execution.failover-strategy: region}} in 
> their {{flink-conf.yaml}}
> *How to reproduce*
> trivial
> *Expected behavior*
> This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030901#comment-17030901
 ] 

Steven Zhen Wu edited comment on FLINK-15918 at 2/5/20 6:26 PM:


quote an answer from our user, which is similar to what Thomas said.

```

to detect a job that is not running. the reason to use uptime is to catch the 
case where the job is continually restarting, so it is mostly “up”, but never 
for a long time

```


was (Author: stevenz3wu):
quote an answer from our user, which is similar to what Thomas said.

``` to detect a job that is not running. the reason to use uptime is to catch 
the case where the job is continually restarting, so it is mostly “up”, but 
never for a long time```

> Uptime Metric not reset on Job Restart
> --
>
> Key: FLINK-15918
> URL: https://issues.apache.org/jira/browse/FLINK-15918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> *Description*
> The {{uptime}} metric is not reset when the job restarts, which is a change 
> in behavior compared to Flink 1.8.
> This change of behavior exists since 1.9.0 if 
> {{jobmanager.execution.failover-strategy: region}} is configured,
> which we do in the default flink-conf.yaml.
> *Workarounds*
> Users that find this behavior problematic can set {{jobmanager.scheduler: 
> legacy}} and unset {{jobmanager.execution.failover-strategy: region}} in 
> their {{flink-conf.yaml}}
> *How to reproduce*
> trivial
> *Expected behavior*
> This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030901#comment-17030901
 ] 

Steven Zhen Wu edited comment on FLINK-15918 at 2/5/20 6:27 PM:


quote an answer from our user, which is similar to what Thomas said.

{quote}

to detect a job that is not running. the reason to use uptime is to catch the 
case where the job is continually restarting, so it is mostly “up”, but never 
for a long time

{quote}


was (Author: stevenz3wu):
quote an answer from our user, which is similar to what Thomas said.

> to detect a job that is not running. the reason to use uptime is to catch the 
> case where the job is continually restarting, so it is mostly “up”, but never 
> for a long time

> Uptime Metric not reset on Job Restart
> --
>
> Key: FLINK-15918
> URL: https://issues.apache.org/jira/browse/FLINK-15918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> *Description*
> The {{uptime}} metric is not reset when the job restarts, which is a change 
> in behavior compared to Flink 1.8.
> This change of behavior exists since 1.9.0 if 
> {{jobmanager.execution.failover-strategy: region}} is configured,
> which we do in the default flink-conf.yaml.
> *Workarounds*
> Users that find this behavior problematic can set {{jobmanager.scheduler: 
> legacy}} and unset {{jobmanager.execution.failover-strategy: region}} in 
> their {{flink-conf.yaml}}
> *How to reproduce*
> trivial
> *Expected behavior*
> This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15918) Uptime Metric not reset on Job Restart

2020-02-05 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17030901#comment-17030901
 ] 

Steven Zhen Wu edited comment on FLINK-15918 at 2/5/20 6:27 PM:


quote an answer from our user, which is similar to what Thomas said.

> to detect a job that is not running. the reason to use uptime is to catch the 
> case where the job is continually restarting, so it is mostly “up”, but never 
> for a long time


was (Author: stevenz3wu):
quote an answer from our user, which is similar to what Thomas said.

```

to detect a job that is not running. the reason to use uptime is to catch the 
case where the job is continually restarting, so it is mostly “up”, but never 
for a long time

```

> Uptime Metric not reset on Job Restart
> --
>
> Key: FLINK-15918
> URL: https://issues.apache.org/jira/browse/FLINK-15918
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.9.2, 1.10.0
>Reporter: Gary Yao
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> *Description*
> The {{uptime}} metric is not reset when the job restarts, which is a change 
> in behavior compared to Flink 1.8.
> This change of behavior exists since 1.9.0 if 
> {{jobmanager.execution.failover-strategy: region}} is configured,
> which we do in the default flink-conf.yaml.
> *Workarounds*
> Users that find this behavior problematic can set {{jobmanager.scheduler: 
> legacy}} and unset {{jobmanager.execution.failover-strategy: region}} in 
> their {{flink-conf.yaml}}
> *How to reproduce*
> trivial
> *Expected behavior*
> This is up for discussion. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15907) expose getRawValue as protected in Configuration so that user can override it

2020-02-07 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-15907.
--
Resolution: Invalid

> expose getRawValue as protected in Configuration so that user can override it
> -
>
> Key: FLINK-15907
> URL: https://issues.apache.org/jira/browse/FLINK-15907
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.2
>Reporter: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We use Archaius for configuration internally: 
> https://github.com/Netflix/archaius
> It will be nice to expose this methods as *_protected_* so that we can 
> override and forward to Archaius.
> {code}
> private Optional getRawValue(String key) 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10896) Extend state schema evolution support for more types

2020-02-26 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045782#comment-17045782
 ] 

Steven Zhen Wu commented on FLINK-10896:


+1 on Scala case classes support. It is widely used.

> Extend state schema evolution support for more types
> 
>
> Key: FLINK-10896
> URL: https://issues.apache.org/jira/browse/FLINK-10896
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> Whether or not a state's schema can be evolved (e.g., removing / adding 
> fields to a POJO-typed state, modifying an Avro-type state's schema, etc.) 
> depends on whether or not the type's corresponding {{TypeSerializer}} and its 
> {{TypeSerializerSnapshot}} properly supports it.
> As of Flink 1.7, we currently only have support for evolving Avro types (with 
> FLINK-10605).
> This tracks the support for other composite types that would benefit from an 
> evolvable schema, such as POJOs, tuples, Scala case classes etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17421) Backpressure new checkpoints if previous were not managed to be cleaned up yet

2020-04-27 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093792#comment-17093792
 ] 

Steven Zhen Wu commented on FLINK-17421:


what does "backpressure new checkpoints" mean? are we suggesting delaying the 
triggering of the next checkpoint.

In our setup, we only allows one concurrent checkpoint. I assume that this 
potential change doesn't matter to us?

 

> Backpressure new checkpoints if previous were not managed to be cleaned up 
> yet 
> ---
>
> Key: FLINK-17421
> URL: https://issues.apache.org/jira/browse/FLINK-17421
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.7.2, 1.8.3, 1.9.3, 1.10.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> As reported in FLINK-17073, ioExecutor might not manage to clean up 
> checkpoints quickly enough causing ever growing memory consumption. A 
> proposed solution would be to backpressure new checkpoints in that scenario.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17531) Add a new checkpoint Guage metric: elapsedSecondsSinceLastCompletedCheckpoint

2020-05-05 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-17531:
--

 Summary: Add a new checkpoint Guage metric: 
elapsedSecondsSinceLastCompletedCheckpoint
 Key: FLINK-17531
 URL: https://issues.apache.org/jira/browse/FLINK-17531
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Steven Zhen Wu


like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`.

*What out existing checkpoint metrics?*

* `numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

* numberOfFailedCheckpoints. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). In theory, one could argue that we can set checkpoint 
timeout to infinity. It is always better to have a long but completed 
checkpoint than a timed out checkpoint, as timed out checkpoint basically give 
up its positions in the queue and new checkpoint just reset the positions back 
to the end of the queue . Note that we are using at least checkpoint semantics. 
So there is no barrier alignment concern. FLIP-76 (unaligned checkpoints) can 
help checkpoint dealing with back pressure better. It is not ready now and also 
has its limitations. We think `elapsedSecondsSinceLastCompletedCheckpoint` is 
very intuitive to set up alert against.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedSecondsSinceLastCompletedCheckpoint

2020-05-05 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`.

*What out existing checkpoint metrics?*
 * `numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 * `numberOfFailedCheckpoints`. That is an explicit failure signal, which is 
good. We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out 
Y minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). In theory, one could argue that we can set checkpoint 
timeout to infinity. It is always better to have a long but completed 
checkpoint than a timed out checkpoint, as timed out checkpoint basically give 
up its positions in the queue and new checkpoint just reset the positions back 
to the end of the queue . Note that we are using at least checkpoint semantics. 
So there is no barrier alignment concern. FLIP-76 (unaligned checkpoints) can 
help checkpoint dealing with back pressure better. It is not ready now and also 
has its limitations. We think `elapsedSecondsSinceLastCompletedCheckpoint` is 
very intuitive to set up alert against.

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`.

*What out existing checkpoint metrics?*

* `numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

* numberOfFailedCheckpoints. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). In theory, one could argue that we can set checkpoint 
timeout to infinity. It is always better to have a long but completed 
checkpoint than a timed out checkpoint, as timed out checkpoint basically give 
up its positions in the queue and new checkpoint just reset the positions back 
to the end of the queue . Note that we are using at least checkpoint semantics. 
So there is no barrier alignment concern. FLIP-76 (unaligned checkpoints) can 
help checkpoint dealing with back pressure better. It is not ready now and also 
has its limitations. We think `elapsedSec

[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedSecondsSinceLastCompletedCheckpoint

2020-05-05 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedSecondsSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to the end of the queue 
. Note that we are using at least checkpoint semantics. So there is no barrier 
alignment concern. FLIP-76 (unaligned checkpoints) can help checkpoint dealing 
with back pressure better. It is not ready now and also has its limitations. 

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`.

*What out existing checkpoint metrics?*
 * `numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 * `numberOfFailedCheckpoints`. That is an explicit failure signal, which is 
good. We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out 
Y minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). In theory, one could argue that we can set checkpoint 
timeout to infinity. It is always better to have a long but completed 
checkpoint than a timed out checkpoint, as timed out checkpoint basically give 
up its positions in the queue and new checkpoint just reset the positions back 
to the end of the queue . Note that we are using at least checkpoint semantics. 
So there is no barrier alignment concern. FLIP-76 (unaligned checkpoints) can 
help checkpoint dealing w

[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedTimeSinceLastCompletedCheckpoint

2020-05-05 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to the end of the queue 
. Note that we are using at least checkpoint semantics. So there is no barrier 
alignment concern. FLIP-76 (unaligned checkpoints) can help checkpoint dealing 
with back pressure better. It is not ready now and also has its limitations. 

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedSecondsSinceLastCompletedCheckpoint`. Main motivation is for alerting. 
I know reasons below are somewhat related to our setup. Hence want to explore 
the interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedSecondsSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedSecondsSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to 

[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedTimeSinceLastCompletedCheckpoint

2020-05-05 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to the end of the queue 
. Note that we are using at least checkpoint semantics. So there is no barrier 
alignment concern. FLIP-76 (unaligned checkpoints) can help checkpoint dealing 
with back pressure better. It is not ready now and also has its limitations. 
That is a separate discussion.

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset 

[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedTimeSinceLastCompletedCheckpoint

2020-05-06 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
derivative(numberOfCompletedCheckpoints) == 0 for N minutes`. However, it is an 
anti-pattern for our alerting system, as it is looking for lack of good signal 
(vs explicit bad signal). Such an anti-pattern is easier to suffer false alarm 
problem when there is occasional metric drop or alerting system processing 
issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if derivative(numberOfFailedCheckpoints) > 0 in 
X out Y minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to the end of the queue 
. Note that we are using at least checkpoint semantics. So there is no barrier 
alignment concern. FLIP-76 (unaligned checkpoints) can help checkpoint dealing 
with back pressure better. It is not ready now and also has its limitations. 
That is a separate discussion.

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
numberOfCompletedCheckpoints = 0 for N minutes`. However, it is an anti-pattern 
for our alerting system, as it is looking for lack of good signal (vs explicit 
bad signal). Such an anti-pattern is easier to suffer false alarm problem when 
there is occasional metric drop or alerting system processing issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if numberOfFailedCheckpoints > 0 in X out Y 
minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and 

[jira] [Updated] (FLINK-17531) Add a new checkpoint Guage metric: elapsedTimeSinceLastCompletedCheckpoint

2020-05-07 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-17531:
---
Description: 
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` metric very easy and intuitive to set 
up alert against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
derivative(numberOfCompletedCheckpoints) == 0 for N minutes`. However, it is an 
anti-pattern for our alerting system, as it is looking for lack of good signal 
(vs explicit bad signal). Such an anti-pattern is easier to suffer false alarm 
problem when there is occasional metric drop or alerting system processing 
issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if derivative(numberOfFailedCheckpoints) > 0 in 
X out Y minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basically give up its positions in the 
queue and new checkpoint just reset the positions back to the end of the queue 
. Note that we are using at least checkpoint semantics. So there is no barrier 
alignment concern. FLIP-76 (unaligned checkpoints) can help checkpoint dealing 
with back pressure better. It is not ready now and also has its limitations. 
That is a separate discussion.

  was:
like to discuss the value of a new checkpoint Guage metric: 
`elapsedTimeSinceLastCompletedCheckpoint`. Main motivation is for alerting. I 
know reasons below are somewhat related to our setup. Hence want to explore the 
interest of the community.

 

*What do we want to achieve?*

We want to alert if no successful checkpoint happened for a specific period. 
With this new metric, we can set up a simple alerting rule like `alert if 
elapsedTimeSinceLastCompletedCheckpoint > N minutes`. It is a good alerting 
pattern of `time since last success`. We found 
`elapsedTimeSinceLastCompletedCheckpoint` very intuitive to set up alert 
against.

 

*What out existing checkpoint metrics?*

`numberOfCompletedCheckpoints`. We can set up an alert like `alert if 
derivative(numberOfCompletedCheckpoints) == 0 for N minutes`. However, it is an 
anti-pattern for our alerting system, as it is looking for lack of good signal 
(vs explicit bad signal). Such an anti-pattern is easier to suffer false alarm 
problem when there is occasional metric drop or alerting system processing 
issue.

 

`numberOfFailedCheckpoints`. That is an explicit failure signal, which is good. 
We can set up alert like `alert if derivative(numberOfFailedCheckpoints) > 0 in 
X out Y minutes`. We have some high-parallelism large-state jobs. Their normal 
checkpoint duration is <1-2 minutes. However, when recovering from an outage 
with large backlog, sometimes subtasks from one or a few containers experienced 
super high back pressure. It took checkpoint barrier sometimes more than an 
hour to travel through the DAG to those heavy back pressured subtasks. Causes 
of the back pressure are likely due to multi-tenancy environment and 
performance variation among containers. Instead of letting checkpoint to time 
out in this case, we decided to increase checkpoint timeout value to crazy long 
value (like 2 hours). With that, we kind of missed the explicit "bad" signal of 
failed/timed out checkpoint.

 

In theory, one could argue that we can set checkpoint timeout to infinity. It 
is always better to have a long but completed checkpoint than a timed out 
checkpoint, as timed out checkpoint basicall

[jira] [Commented] (FLINK-17571) A better way to show the files used in currently checkpoints

2020-05-08 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103047#comment-17103047
 ] 

Steven Zhen Wu commented on FLINK-17571:


+1.

We are also thinking about how to do GC with incremental checkpoints. 
Basically, it is like Java GC: tracing live files from root (retained external 
checkpoints).

> A better way to show the files used in currently checkpoints
> 
>
> Key: FLINK-17571
> URL: https://issues.apache.org/jira/browse/FLINK-17571
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by the 
> [userMail|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]
> Currently, there are [three types of 
> directory|https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure]
>  for a checkpoint, the files in TASKOWND and EXCLUSIVE directory can be 
> deleted safely, but users can't delete the files in the SHARED directory 
> safely(the files may be created a long time ago).
> I think it's better to give users a better way to know which files are 
> currently used(so the others are not used)
> maybe a command-line command such as below is ok enough to support such a 
> feature.
> {{./bin/flink checkpoint list $checkpointDir  # list all the files used in 
> checkpoint}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17571) A better way to show the files used in currently checkpoints

2020-05-09 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103518#comment-17103518
 ] 

Steven Zhen Wu commented on FLINK-17571:


[~pnowojski] what is the usage of the remove command?

Please correct my understanding on incremental checkpoint.
 * It removes S3 files when reference count reaching zero. Normally, there 
shouldn't be orphaned checkpoint files lingering around. Maybe in some rare 
cases, reference count based cleanup didn't happen or succeed. so there is a 
small chance of orphaned files here.
 * We don't always restore from external checkpoint and continue the same 
checkpoint lineage. E.g. we can restore from a savepoint or empty state. Then 
those abandoned checkpoint lineages can leave significant garbage behind. 

here is what I am thinking about the GC
 # trace from root of retained external checkpoints to find all live files
 # Find all files in S3 bucket/prefix. I heard S3 can send daily report and we 
don't have to list objects
 # find the diff and remove the non live files (with some safety threshold like 
older than 30 days)

> A better way to show the files used in currently checkpoints
> 
>
> Key: FLINK-17571
> URL: https://issues.apache.org/jira/browse/FLINK-17571
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by the 
> [userMail|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]
> Currently, there are [three types of 
> directory|https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure]
>  for a checkpoint, the files in TASKOWND and EXCLUSIVE directory can be 
> deleted safely, but users can't delete the files in the SHARED directory 
> safely(the files may be created a long time ago).
> I think it's better to give users a better way to know which files are 
> currently used(so the others are not used)
> maybe a command-line command such as below is ok enough to support such a 
> feature.
> {{./bin/flink checkpoint list $checkpointDir  # list all the files used in 
> checkpoint}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17571) A better way to show the files used in currently checkpoints

2020-05-09 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103518#comment-17103518
 ] 

Steven Zhen Wu edited comment on FLINK-17571 at 5/9/20, 10:59 PM:
--

[~pnowojski] what is the usage of the remove command?

Please correct my understanding on incremental checkpoint.
 * It removes S3 files when reference count reaching zero. Normally, there 
shouldn't be orphaned checkpoint files lingering around. Maybe in some rare 
cases, reference count based cleanup didn't happen or succeed. so there is a 
small chance of orphaned files here.
 * We don't always restore from external checkpoint and continue the same 
checkpoint lineage (with incremental checkpoint and reference count). E.g. we 
can restore from a savepoint or empty state. Then those abandoned checkpoint 
lineages can leave significant garbage behind. 

here is what I am thinking about the GC
 # trace from root of retained external checkpoints to find all live files
 # Find all files in S3 bucket/prefix. I heard S3 can send daily report and we 
don't have to list objects
 # find the diff and remove the non live files (with some safety threshold like 
older than 30 days)


was (Author: stevenz3wu):
[~pnowojski] what is the usage of the remove command?

Please correct my understanding on incremental checkpoint.
 * It removes S3 files when reference count reaching zero. Normally, there 
shouldn't be orphaned checkpoint files lingering around. Maybe in some rare 
cases, reference count based cleanup didn't happen or succeed. so there is a 
small chance of orphaned files here.
 * We don't always restore from external checkpoint and continue the same 
checkpoint lineage. E.g. we can restore from a savepoint or empty state. Then 
those abandoned checkpoint lineages can leave significant garbage behind. 

here is what I am thinking about the GC
 # trace from root of retained external checkpoints to find all live files
 # Find all files in S3 bucket/prefix. I heard S3 can send daily report and we 
don't have to list objects
 # find the diff and remove the non live files (with some safety threshold like 
older than 30 days)

> A better way to show the files used in currently checkpoints
> 
>
> Key: FLINK-17571
> URL: https://issues.apache.org/jira/browse/FLINK-17571
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing
>Reporter: Congxian Qiu(klion26)
>Priority: Major
>
> Inspired by the 
> [userMail|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Shared-Checkpoint-Cleanup-and-S3-Lifecycle-Policy-tt34965.html]
> Currently, there are [three types of 
> directory|https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure]
>  for a checkpoint, the files in TASKOWND and EXCLUSIVE directory can be 
> deleted safely, but users can't delete the files in the SHARED directory 
> safely(the files may be created a long time ago).
> I think it's better to give users a better way to know which files are 
> currently used(so the others are not used)
> maybe a command-line command such as below is ok enough to support such a 
> feature.
> {{./bin/flink checkpoint list $checkpointDir  # list all the files used in 
> checkpoint}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16444) Count the read/write/seek/next latency of RocksDB as metrics

2020-03-07 Thread Steven Zhen Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054175#comment-17054175
 ] 

Steven Zhen Wu commented on FLINK-16444:


+1. this is super useful if RocksDB / disk IO becomes bottleneck

> Count the read/write/seek/next latency of RocksDB as metrics
> 
>
> Key: FLINK-16444
> URL: https://issues.apache.org/jira/browse/FLINK-16444
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yun Tang
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, user cannot know the read/write/seek/next latency of RocksDB, we 
> could add these helpful metrics to know the overall state performance. To not 
> affect the action performance much, we could introduce counter to only record 
> the latency at interval of some actions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (FLINK-15907) expose getRawValue as protected in Configuration so that user can override it

2020-03-11 Thread Steven Zhen Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu reopened FLINK-15907:


[~aljoscha]  [~dwysakowicz]  I like to reopen the discussion. 

I mentioned in the PR that we can work around by just transferring properties 
into Flink {{Configuration}} by calling {{Configuration#setString(String key, 
String value)}}.

However, it will be nicer for us to extend if Flink can expose `getRawValue` or 
`ReadableConfigToConfigurationAdapter`. Then we don't have to copy properties.

 

> expose getRawValue as protected in Configuration so that user can override it
> -
>
> Key: FLINK-15907
> URL: https://issues.apache.org/jira/browse/FLINK-15907
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Configuration
>Affects Versions: 1.9.2
>Reporter: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We use Archaius for configuration internally: 
> https://github.com/Netflix/archaius
> It will be nice to expose this methods as *_protected_* so that we can 
> override and forward to Archaius.
> {code}
> private Optional getRawValue(String key) 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20871) Make DataStream#executeAndCollectWithClient public

2021-01-06 Thread Steven Zhen Wu (Jira)
Steven Zhen Wu created FLINK-20871:
--

 Summary: Make DataStream#executeAndCollectWithClient public
 Key: FLINK-20871
 URL: https://issues.apache.org/jira/browse/FLINK-20871
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.12.0
Reporter: Steven Zhen Wu


Right now, `DataStreamUtils#collectWithClient` is marked as deprecated in favor 
of the `DataStream#executeAndCollect()`. However, some integration tests (e.g.  
[FileSourceTextLinesITCase](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java#L187))
 need the `DataStream#executeAndCollectWithClient` API to get JobClient to 
cancel the job after collected required output for unbounded source test.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >