[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:57 AM:


The collector can be made thread-safe yes, depending on the implementation. But 
for the  synchronization to work properly, user implementations of 
{{deserialize(bytes, collector)}} will need to make sure that all outputs are 
added to the collector before returning from the method.
 
Also note that for the underlying implementation, it might differ between 0.8 
and 0.9+, mainly due to the different threading models for how partitions are 
consumed.
For 0.8 I think we need to have a separate collector for each subscribed Kafka 
partition, while in 0.9, we can have a single collector for the whole subtask. 


was (Author: tzulitai):
The collector can be made thread-safe yes, depending on the implementation. But 
for the  synchronization to work properly, user implementations of 
{{deserialize(bytes, collector)}} will need to make sure that all outputs are 
added to the collector before returning from the method.
 
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-5702:
--

Assignee: Tzu-Li (Gordon) Tai

> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is 
> compromised
> -
>
> Key: FLINK-5702
> URL: https://issues.apache.org/jira/browse/FLINK-5702
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about 
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only 
> log failures instead of failing the sink, at-least-once can not be guaranteed 
> .



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:47 AM:


The collector can be made thread-safe yes, depending on the implementation. But 
for the  synchronization to work properly, user implementations of 
{{deserialize(bytes, collector)}} will need to make sure that all outputs are 
added to the collector before returning from the method.
 
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.


was (Author: tzulitai):
The collector can be made thread-safe yes, depending on the implementation. But 
for the  synchronization to work properly, user implementations of 
`deserialize()` will need to make sure that all outputs are added to the 
collector before returning from the method.
 
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:47 AM:


The collector can be made thread-safe yes, depending on the implementation. But 
for the  synchronization to work properly, user implementations of 
`deserialize()` will need to make sure that all outputs are added to the 
collector before returning from the method.
 
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.


was (Author: tzulitai):
The collector can be made thread-safe yes, depending on the implementation.
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855488#comment-15855488
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5583:


The collector can be made thread-safe yes, depending on the implementation.
Also note that for the underlying implementation I think we should have a 
separate collector for each subscribed Kafka partition. A collector cannot be 
shared for multiple partitions.

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3282: [FLINK-5702] [doc] At-least-once configuration inf...

2017-02-06 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3282

[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer

Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` 
methods with regards to at-least-once guarantees of the producer, and warns 
that at-least-once is compromised if configured inappropriately.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-5702

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3282


commit b90a5db99802d8d483ad064f21f8a3fbe42c4076
Author: Tzu-Li (Gordon) Tai 
Date:   2017-02-07T06:32:28Z

[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855483#comment-15855483
 ] 

ASF GitHub Bot commented on FLINK-5702:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3282

[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer

Adds information about the `setLogFailureOnly` and `setFlushOnCheckpoint` 
methods with regards to at-least-once guarantees of the producer, and warns 
that at-least-once is compromised if configured inappropriately.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-5702

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3282


commit b90a5db99802d8d483ad064f21f8a3fbe42c4076
Author: Tzu-Li (Gordon) Tai 
Date:   2017-02-07T06:32:28Z

[FLINK-5702] [doc] At-least-once configuration info for FlinkKafkaProducer




> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is 
> compromised
> -
>
> Key: FLINK-5702
> URL: https://issues.apache.org/jira/browse/FLINK-5702
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about 
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only 
> log failures instead of failing the sink, at-least-once can not be guaranteed 
> .



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5728) FlinkKafkaProducer should flush on checkpoint by default

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-5728:
--

 Summary: FlinkKafkaProducer should flush on checkpoint by default
 Key: FLINK-5728
 URL: https://issues.apache.org/jira/browse/FLINK-5728
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Tzu-Li (Gordon) Tai


As discussed in FLINK-5702, it might be a good idea to let the 
FlinkKafkaProducer flush on checkpoints by default. Currently, it is disabled 
by default.

It's a very simple change, but we should think about whether or not we want to 
break user behaviour, or have proper usage migration.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855478#comment-15855478
 ] 

Haohui Mai commented on FLINK-5583:
---

The interface looks good to me. It looks like that you want to have the 
deserialization out of the lock. Is it okay to assume that collector is 
thread-safe?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5583:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

```
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
```

Something like the above (ignore the dummy name, we can think of a better one 
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:21 AM:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method; some users might just want to drop the corrupt record).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
internal consumer state, as a single atomic operation synchronized on the 
checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32, and not in-between.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
internal consumer state, as a single atomic operation synchronized on the 
checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32, and not in-between.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by 

[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:20 AM:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
internal consumer state, as a single atomic operation synchronized on the 
checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32, and not in-between.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
internal consumer state, as a single atomic operation synchronized on the 
checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:17 AM:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

```
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
```

Something like the above (ignore the dummy name, we can think of a better one 
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:19 AM:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
internal consumer state, as a single atomic operation synchronized on the 
checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855466#comment-15855466
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:18 AM:


Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think 
this necessary for general use cases (especially the {{writeToExternalSources}} 
method).

First of all, I would still like to keep the interface to the minimal 
flatMap-like version proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema extends Serializable, 
ResultTypeQueryable {
// user uses collector to buffer outputs
void deserialize(byte[] message, OutputCollector collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one 
:-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records 
produced by calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into 
consumer state, as a single atomic operation synchronized on the checkpoint 
lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a 
checkpoint barrier will only come either after or before all the produced 
records of offset 32.

For the synchronization explained above, we do not need to expose another 
{{flush}} method to the user.

For your use case, in which you want to write corrupt bytes to a storage, you 
would do that with a try-catch block in your implementation of 
{{deserialization.deserialize(bytes, collector)}}. The only limitation here is 
that it must be a blocking call. Blocking call for this might be ok, depending 
on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855424#comment-15855424
 ] 

Haohui Mai commented on FLINK-5583:
---

The pseudo-code looks like the following:

{code}
void invoke() {
  try {
Tuple2 kv = deserialization.deserialze(bytes);
  } catch (Throwable v) {
// Write the corrupted message to external sources (e.g., Kafka, HDFS)
deserialization.writeToExternalSources(bytes);
  }
}

void checkpoint() {
  ...
  deserialization.flush();
}
{code}

Strictly speaking the {{DeserialzationSchema}} does not need to have persistent 
state (which IMO should be the right thing to do). However, it does require 
proper synchronizations when the consumer checkpoints. Does it make sense to 
you [~tzulitai]?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3274: [FLINK-5723][UI]Use Used instead of Initial to make taskm...

2017-02-06 Thread WangTaoTheTonic
Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3274
  
@zentol Thanks for review. I've changed .jade file, but it looks like CI is 
not up properly :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5723) Use "Used" instead of "Initial" to make taskmanager tag more readable

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855412#comment-15855412
 ] 

ASF GitHub Bot commented on FLINK-5723:
---

Github user WangTaoTheTonic commented on the issue:

https://github.com/apache/flink/pull/3274
  
@zentol Thanks for review. I've changed .jade file, but it looks like CI is 
not up properly :(


> Use "Used" instead of "Initial" to make taskmanager tag more readable
> -
>
> Key: FLINK-5723
> URL: https://issues.apache.org/jira/browse/FLINK-5723
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Tao Wang
>Priority: Trivial
>
> Now in JobManager web fronted, the used memory of task managers is presented 
> as "Initial" in table header, which actually means "memory used", from codes.
> I'd like change it to be more readable, even it is trivial one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3281: [FLINK-5727] [table] Unify some API of batch and s...

2017-02-06 Thread KurtYoung
GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3281

[FLINK-5727] [table] Unify some API of batch and stream TableEnvironment

This PR contains 3 changes:

1. move `sql` to base class
2. unify `scan` and `ingest` to `scan`, and move to base class
3. move `registerTableSource` to base class and change the parameter type 
from specific `BatchTableSource` or `StreamTableSource` to their base class: 
`TableSource`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5727

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3281


commit 8303b0c1c6320713daf212c6da4d72499c459e42
Author: Kurt Young 
Date:   2017-02-07T06:15:02Z

[FLINK-5727] [table] Unify some API of batch and stream TableEnvironment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5727) Unify some API of batch and stream TableEnvironment

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855407#comment-15855407
 ] 

ASF GitHub Bot commented on FLINK-5727:
---

GitHub user KurtYoung opened a pull request:

https://github.com/apache/flink/pull/3281

[FLINK-5727] [table] Unify some API of batch and stream TableEnvironment

This PR contains 3 changes:

1. move `sql` to base class
2. unify `scan` and `ingest` to `scan`, and move to base class
3. move `registerTableSource` to base class and change the parameter type 
from specific `BatchTableSource` or `StreamTableSource` to their base class: 
`TableSource`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/KurtYoung/flink flink-5727

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3281


commit 8303b0c1c6320713daf212c6da4d72499c459e42
Author: Kurt Young 
Date:   2017-02-07T06:15:02Z

[FLINK-5727] [table] Unify some API of batch and stream TableEnvironment




> Unify some API of batch and stream TableEnvironment
> ---
>
> Key: FLINK-5727
> URL: https://issues.apache.org/jira/browse/FLINK-5727
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Kurt Young
>
> Some API's implementation of current {{BatchTableEnvironment}} and 
> {{StreamTableEnvironment}} is identical, like {{sql}}. And other API like 
> {{registerTableSource}} and {{scan}} can be unified to the base class. (we 
> can change {{ingest}} from {{StreamTableEnvironment}} to {{scan}} also)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3111: [FLINK-3475] [Table] DISTINCT aggregate function support ...

2017-02-06 Thread docete
Github user docete commented on the issue:

https://github.com/apache/flink/pull/3111
  
Agree. If we have more than one distinct agg with groupings, do the 
partition first and reuse the subsets would improve the performance. 
Could we merge this PR first and create another JIRA to track the grouping 
cases? 
We need a workaround to support distinct aggs ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3475) DISTINCT aggregate function support for SQL queries

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855397#comment-15855397
 ] 

ASF GitHub Bot commented on FLINK-3475:
---

Github user docete commented on the issue:

https://github.com/apache/flink/pull/3111
  
Agree. If we have more than one distinct agg with groupings, do the 
partition first and reuse the subsets would improve the performance. 
Could we merge this PR first and create another JIRA to track the grouping 
cases? 
We need a workaround to support distinct aggs ASAP.


> DISTINCT aggregate function support for SQL queries
> ---
>
> Key: FLINK-3475
> URL: https://issues.apache.org/jira/browse/FLINK-3475
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Chengxiang Li
>Assignee: Zhenghua Gao
>
> DISTINCT aggregate function may be able to reuse the aggregate function 
> instead of separate implementation, and let Flink runtime take care of 
> duplicate records.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855383#comment-15855383
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske - A gentle reminder !!!


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3149: FLINK-2168 Add HBaseTableSource

2017-02-06 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/3149
  
@fhueske - A gentle reminder !!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5727) Unify some API of batch and stream TableEnvironment

2017-02-06 Thread Kurt Young (JIRA)
Kurt Young created FLINK-5727:
-

 Summary: Unify some API of batch and stream TableEnvironment
 Key: FLINK-5727
 URL: https://issues.apache.org/jira/browse/FLINK-5727
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Kurt Young
Assignee: Kurt Young


Some API's implementation of current {{BatchTableEnvironment}} and 
{{StreamTableEnvironment}} is identical, like {{sql}}. And other API like 
{{registerTableSource}} and {{scan}} can be unified to the base class. (we can 
change {{ingest}} from {{StreamTableEnvironment}} to {{scan}} also)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5702) Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is compromised

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855325#comment-15855325
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5702:


[~StephanEwen] Good point about changing the default. In this case we might 
need to think about migrating the {{FlinkKafkaProducerBase}} also though, as 
changing the default is a user-behaviour breaking change. I'll open a separate 
JIRA for this.

> Kafka Producer docs should warn if using setLogFailuresOnly, at-least-once is 
> compromised
> -
>
> Key: FLINK-5702
> URL: https://issues.apache.org/jira/browse/FLINK-5702
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>
> The documentation for FlinkKafkaProducer does not have any information about 
> the {{setLogFailuresOnly}}. It should emphasize that if users choose to only 
> log failures instead of failing the sink, at-least-once can not be guaranteed 
> .



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5704) Deprecate FlinkKafkaConsumer constructors in favor of improvements to decoupling from Kafka offset committing

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855319#comment-15855319
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5704:


FLINK-3679 and FLINK-5583 would probably change the {{DeserializationSchema}} 
interface. We should include that as part of the Kafka consumer usage migration.

> Deprecate FlinkKafkaConsumer constructors in favor of improvements to 
> decoupling from Kafka offset committing
> -
>
> Key: FLINK-5704
> URL: https://issues.apache.org/jira/browse/FLINK-5704
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> With FLINK-3398 and FLINK-4280, the {{FlinkKafkaConsumer}} will be able to 
> completely operate independently of committed offsets in Kafka.
> I.e.,
> (1) *Starting position*: when starting, the consumer can choose to not use 
> any committed offsets in Kafka as the starting position
> (2) *Committing offsets back to Kafka*: the consumer can completely opt-out 
> of committing offsets back to Kafka
> However, our current default behaviour for (1) is to respect committed 
> offsets, and (2) is to always have offset committing. Users still have to 
> call the respective setter configuration methods to change this.
> I think we should deprecate the current constructors in favor of new ones 
> with default behaviours (1) start from the latest record, without respecting 
> Kafka offsets, and (2) don't commit offsets.
> With this change, users explicitly call the config methods of FLINK-3398 and 
> FLINK-4280 to *enable* respecting committed offsets for Kafka, instead of 
> _disabling_ it. They would want to / need to enable it, only when perhaps to 
> migrate from a non-Flink consuming application, or they wish to expose the 
> internal checkpointed offsets to measure consumer lag using Kafka toolings.
> The main advantage for this change is that the API of {{FlinkKafkaConsumer}} 
> can speak for itself that it does not depend on committed offsets in Kafka 
> (this is a misconception that users frequently have), and that exactly-once 
> depends solely on offsets checkpointed internally using Flink's checkpointing 
> mechanics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855313#comment-15855313
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5583:


Regarding breaking user code when changing to a flatMap-like 
{{DeserializationSchema}}: I would actually prefer to have a new separate 
interface instead of breaking the original one. We could make use of FLINK-5704 
to migrate to the new deserialization interface.

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5726) Add the RocketMQ plugin for the Apache Flink

2017-02-06 Thread Tao Meng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Meng updated FLINK-5726:

Summary: Add the RocketMQ plugin for the Apache Flink  (was: Add the 
RocketMQ plugin for the Apache Spark)

> Add the RocketMQ plugin for the Apache Flink
> 
>
> Key: FLINK-5726
> URL: https://issues.apache.org/jira/browse/FLINK-5726
> Project: Flink
>  Issue Type: Task
>Reporter: Longda Feng
>Priority: Minor
>
> Apache RocketMQ® is an open source distributed messaging and streaming data 
> platform. It has been used in a lot of companies. Please refer to 
> http://rocketmq.incubator.apache.org/ for more details.
> Since the Apache RocketMq 4.0 will be released in the next few days, we can 
> start the job of adding the RocketMq plugin for the Apache Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5726) Add the RocketMQ plugin for the Apache Spark

2017-02-06 Thread Longda Feng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Longda Feng updated FLINK-5726:
---
External issue ID: ROCKETMQ-82

> Add the RocketMQ plugin for the Apache Spark
> 
>
> Key: FLINK-5726
> URL: https://issues.apache.org/jira/browse/FLINK-5726
> Project: Flink
>  Issue Type: Task
>Reporter: Longda Feng
>Priority: Minor
>
> Apache RocketMQ® is an open source distributed messaging and streaming data 
> platform. It has been used in a lot of companies. Please refer to 
> http://rocketmq.incubator.apache.org/ for more details.
> Since the Apache RocketMq 4.0 will be released in the next few days, we can 
> start the job of adding the RocketMq plugin for the Apache Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5726) Add the RocketMQ plugin for the Apache Spark

2017-02-06 Thread Longda Feng (JIRA)
Longda Feng created FLINK-5726:
--

 Summary: Add the RocketMQ plugin for the Apache Spark
 Key: FLINK-5726
 URL: https://issues.apache.org/jira/browse/FLINK-5726
 Project: Flink
  Issue Type: Task
Reporter: Longda Feng
Priority: Minor


Apache RocketMQ® is an open source distributed messaging and streaming data 
platform. It has been used in a lot of companies. Please refer to 
http://rocketmq.incubator.apache.org/ for more details.


Since the Apache RocketMq 4.0 will be released in the next few days, we can 
start the job of adding the RocketMq plugin for the Apache Flink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855251#comment-15855251
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5583:


Hi [~wheat9],

I'm not quite sure why your case requires the {{DeserializationSchema}} to be 
stateful. Could you elaborate a bit more? What is the state of the 
{{DeserializationSchema}} you have in mind?

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855241#comment-15855241
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2770
  
@tillrohrmann , thank you for review!

Actually this PR is out-of-date based on flip-6, and my following 
modifications do not update this PR. This PR should be re-submitted based on 
master and I forgot to close it.

Some above suggestions have already  been fixed in the current 
implementation, and I want to merge this part in #3151 in order to be reviewed 
easily.


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #2770: [FLINK-4354]Implement TaskManager side of heartbeat from ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2770
  
@tillrohrmann , thank you for review!

Actually this PR is out-of-date based on flip-6, and my following 
modifications do not update this PR. This PR should be re-submitted based on 
master and I forgot to close it.

Some above suggestions have already  been fixed in the current 
implementation, and I want to merge this part in #3151 in order to be reviewed 
easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855222#comment-15855222
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737255
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -99,6 +101,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

It is already added in new modifications


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855218#comment-15855218
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1099,4 +1121,23 @@ public void run() {
});
}
}
+
+   private final class ResourceManagerHeartbeatListener implements 
HeartbeatListener {
+
+   ResourceManagerHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(ResourceID resourceID) {
+   }
--- End diff --

It will be added in the new modifications


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737255
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -99,6 +101,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

It is already added in new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1099,4 +1121,23 @@ public void run() {
});
}
}
+
+   private final class ResourceManagerHeartbeatListener implements 
HeartbeatListener {
+
+   ResourceManagerHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(ResourceID resourceID) {
+   }
--- End diff --

It will be added in the new modifications


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855217#comment-15855217
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -122,4 +122,11 @@ void notifySlotAvailable(
 * @param optionalDiagnostics
 */
void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+   /**
+* sends the heartbeat to resource manager from task manager
+* @param resourceID unique id of the task manager
+* @param payload the payload information of the task manager
+*/
+   void sendHeartbeatFromTaskManager(final ResourceID resourceID, final 
Object payload);
--- End diff --

agree with it


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855216#comment-15855216
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -137,6 +140,7 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
+   this.resourceID = ResourceID.generate();
--- End diff --

Yes, in the current implementation it is determined by 
**ResourceManagerRunner**.This PR is submitted long time ago and some dependent 
work is not complete at that time.


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 ---
@@ -122,4 +122,11 @@ void notifySlotAvailable(
 * @param optionalDiagnostics
 */
void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+   /**
+* sends the heartbeat to resource manager from task manager
+* @param resourceID unique id of the task manager
+* @param payload the payload information of the task manager
+*/
+   void sendHeartbeatFromTaskManager(final ResourceID resourceID, final 
Object payload);
--- End diff --

agree with it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99737093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -137,6 +140,7 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.leaderSessionId = null;
+   this.resourceID = ResourceID.generate();
--- End diff --

Yes, in the current implementation it is determined by 
**ResourceManagerRunner**.This PR is submitted long time ago and some dependent 
work is not complete at that time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4354) Implement TaskManager side of heartbeat from ResourceManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855212#comment-15855212
 ] 

ASF GitHub Bot commented on FLINK-4354:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99736682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse 
registrationResponse, Thr
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID resourceID,
-   final SlotReport slotReport) {
+   final SlotReport slotReport)
+   {
--- End diff --

It may be a misoperation, I will be careful next time.


>  Implement TaskManager side of heartbeat from ResourceManager
> -
>
> Key: FLINK-4354
> URL: https://issues.apache.org/jira/browse/FLINK-4354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{ResourceManager}} initiates heartbeat messages via the {{RmLeaderID}}. 
> The {{TaskManager}} transmits its slot availability with each heartbeat. That 
> way, the RM will always know about available slots.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2770: [FLINK-4354]Implement TaskManager side of heartbea...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2770#discussion_r99736682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -340,7 +344,8 @@ public RegistrationResponse apply(RegistrationResponse 
registrationResponse, Thr
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID resourceID,
-   final SlotReport slotReport) {
+   final SlotReport slotReport)
+   {
--- End diff --

It may be a misoperation, I will be careful next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855209#comment-15855209
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99736386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -126,6 +129,9 @@
/** The metric registry in the task manager */
private final MetricRegistry metricRegistry;
 
+   /** The heartbeat manager for job manager and resource manager in the 
task manager */
+   private final HeartbeatManagerImpl heartbeatManager;
--- End diff --

Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not 
wrapper it.


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99736386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -126,6 +129,9 @@
/** The metric registry in the task manager */
private final MetricRegistry metricRegistry;
 
+   /** The heartbeat manager for job manager and resource manager in the 
task manager */
+   private final HeartbeatManagerImpl heartbeatManager;
--- End diff --

Yes, the current **HeartbeatManagerImpl** is enough to use, so I did not 
wrapper it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855207#comment-15855207
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann . Thank you for detail review and comments!

This PR just submit the heartbeat logic in TM side, because there is 
already a jira of JM heartbeat side.

For my implementation, the JM initiates the heartbeat with 
**HeartbeatManagerSenderImpl** and the TM responses the heartbeat with 
**HeartbeatManagerImpl**. So the heartbeat process is one-way.

I think it is better to submit the JM heartbeat logic in this PR in order 
to understand easily. I will modify this PR soon, and for testing there already 
exists the UT for basic heartbeat logic. Do you mean to add some ITCases?


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3151: [FLINK-4364][runtime]mplement TaskManager side of heartbe...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/3151
  
@tillrohrmann . Thank you for detail review and comments!

This PR just submit the heartbeat logic in TM side, because there is 
already a jira of JM heartbeat side.

For my implementation, the JM initiates the heartbeat with 
**HeartbeatManagerSenderImpl** and the TM responses the heartbeat with 
**HeartbeatManagerImpl**. So the heartbeat process is one-way.

I think it is better to submit the JM heartbeat logic in this PR in order 
to understand easily. I will modify this PR soon, and for testing there already 
exists the UT for basic heartbeat logic. Do you mean to add some ITCases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3138: #Flink-5522 Storm Local Cluster can't work with powermock

2017-02-06 Thread liuyuzhong7
Github user liuyuzhong7 commented on the issue:

https://github.com/apache/flink/pull/3138
  
@StephanEwen  What should to to with this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855194#comment-15855194
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99734627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID 
jobId, JobMasterGateway jobMast
jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
}
 
+   
heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new 
HeartbeatTarget() {
+   @Override
+   public void sendHeartbeat(ResourceID resourceID, Object 
payload) {
+   
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
+   }
+
+   @Override
+   public void requestHeartbeat(ResourceID resourceID, 
Object payload) {
+   throw new UnsupportedOperationException("Should 
never call requestHeartbeat in task manager.");
--- End diff --

My previous understanding is that the heartbeat is only requested from RM 
and JM to TM, and the TM will only response the heartbeat. Do you mean the 
heartbeat can both request from both sides? If to do so, the TM also needs to 
schedule a heartbeat request at interval time.


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99734627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -741,6 +763,18 @@ private void establishJobManagerConnection(JobID 
jobId, JobMasterGateway jobMast
jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
}
 
+   
heartbeatManager.monitorTarget(registrationSuccess.getResourceID(), new 
HeartbeatTarget() {
+   @Override
+   public void sendHeartbeat(ResourceID resourceID, Object 
payload) {
+   
jobMasterGateway.heartbeatFromTaskManager(resourceID, payload);
+   }
+
+   @Override
+   public void requestHeartbeat(ResourceID resourceID, 
Object payload) {
+   throw new UnsupportedOperationException("Should 
never call requestHeartbeat in task manager.");
--- End diff --

My previous understanding is that the heartbeat is only requested from RM 
and JM to TM, and the TM will only response the heartbeat. Do you mean the 
heartbeat can both request from both sides? If to do so, the TM also needs to 
schedule a heartbeat request at interval time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855179#comment-15855179
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99733319
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -112,6 +114,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   
taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

I agree with that, and initially I want to reuse the executor in 
**RPCService** but it needs some modifications. I will introduce it in Runner 
in order to share among components.


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99733319
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 ---
@@ -112,6 +114,13 @@ public TaskManagerRunner(
// Initialize the TM metrics

TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
 
+   HeartbeatManagerImpl heartbeatManager = new 
HeartbeatManagerImpl(
+   
taskManagerConfiguration.getTimeout().toMilliseconds(),
+   resourceID,
+   executor,
+   Executors.newSingleThreadScheduledExecutor(),
--- End diff --

I agree with that, and initially I want to reuse the executor in 
**RPCService** but it needs some modifications. I will introduce it in Runner 
in order to share among components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855171#comment-15855171
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732610
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
+
+   JMRMHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+   log.info("Notify heartbeat timeout for resourceID {}", 
resourceID);
--- End diff --

Yes, it actually should trigger some actions with timeout. Currently I did 
not submit this part because I think it is related with failure detection logic 
 and supposed to submit in another PR. To make the heartbeat mechanism 
complete, I will add this part in the following modifications.


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732610
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
+
+   JMRMHeartbeatListener() {
+   }
+
+   @Override
+   public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+   log.info("Notify heartbeat timeout for resourceID {}", 
resourceID);
--- End diff --

Yes, it actually should trigger some actions with timeout. Currently I did 
not submit this part because I think it is related with failure detection logic 
 and supposed to submit in another PR. To make the heartbeat mechanism 
complete, I will add this part in the following modifications.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
--- End diff --

From TaskExecutor side, it will monitor the JM and RM that initiate the 
heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or 
RM, the current PR just shows the interaction with JM part. I am supposed to 
submit the RM part in another PR, maybe submit both sides together will help to 
understand easily.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855166#comment-15855166
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99732047
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1058,6 +1092,30 @@ public void handleError(Throwable throwable) {
}
}
 
+   /**
+* The heartbeat listener for JobManager and ResourceManager, they can 
be distinguished by ResourceID
+* and trigger different processes.
+*/
+   private final class JMRMHeartbeatListener implements HeartbeatListener {
--- End diff --

From TaskExecutor side, it will monitor the JM and RM that initiate the 
heartbeat, so JMRMHeartbeatListener indicates the heartbeat timeout with JM or 
RM, the current PR just shows the interaction with JM part. I am supposed to 
submit the RM part in another PR, maybe submit both sides together will help to 
understand easily.


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15855154#comment-15855154
 ] 

ASF GitHub Bot commented on FLINK-4364:
---

Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99731288
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId,
@RpcTimeout final Time timeout);
+
+   /**
+* Send the heartbeat to job manager from task manager
+*
+* @param resourceID unique id of the task manager
+* @param payload the payload information from the task manager
+*/
+   void heartbeatFromTaskManager(final ResourceID resourceID, final Object 
payload);
--- End diff --

Currently we have not considered which specific payload information should 
be attached with the heartbeat, so use the object to work around.  It should be 
changed to specific type if confirmation. Do you think which payload should be 
attached necessary currently?


> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3151: [FLINK-4364][runtime]mplement TaskManager side of ...

2017-02-06 Thread wangzhijiang999
Github user wangzhijiang999 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3151#discussion_r99731288
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -218,4 +218,12 @@ void failSlot(final ResourceID taskManagerId,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId,
@RpcTimeout final Time timeout);
+
+   /**
+* Send the heartbeat to job manager from task manager
+*
+* @param resourceID unique id of the task manager
+* @param payload the payload information from the task manager
+*/
+   void heartbeatFromTaskManager(final ResourceID resourceID, final Object 
payload);
--- End diff --

Currently we have not considered which specific payload information should 
be attached with the heartbeat, so use the object to work around.  It should be 
changed to specific type if confirmation. Do you think which payload should be 
attached necessary currently?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5725) Support JOIN between two streams in the SQL API

2017-02-06 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-5725:
---
Component/s: Table API & SQL

> Support JOIN between two streams in the SQL API
> ---
>
> Key: FLINK-5725
> URL: https://issues.apache.org/jira/browse/FLINK-5725
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> As described in the title.
> This jira proposes to support joining two streaming tables in the SQL API.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5583) Support flexible error handling in the Kafka consumer

2017-02-06 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854944#comment-15854944
 ] 

Haohui Mai commented on FLINK-5583:
---

In general (1) sounds good to me. Taking a closer look it seems that it might 
require a stateful API instead of the traditional {{Collector}} APIs.

We have a mission-critical use case that needs to write all corrupted messages 
to a persistent store so that these messages can be inspected and backfilled 
later. Ideally the {{DeserializationSchema}} could some state and probably will 
need to be synchronized when checkpoints happen.

It might be more natural to deserialize messages as a subsequent stage of the 
consumer. Thoughts?

[~rmetzger] [~tzulitai]

> Support flexible error handling in the Kafka consumer
> -
>
> Key: FLINK-5583
> URL: https://issues.apache.org/jira/browse/FLINK-5583
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and 
> exceptions in the Kafka consumer in order to build a robust application in 
> production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The 
> streaming pipeline might want to bail out (which is the current behavior) or 
> to skip the corrupted records depending on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like 
> structure as suggested in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some 
> APIs that are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854864#comment-15854864
 ] 

ASF GitHub Bot commented on FLINK-5624:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` 
clause.


> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-06 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` 
clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5725) Support JOIN between two streams in the SQL API

2017-02-06 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-5725:
-

 Summary: Support JOIN between two streams in the SQL API
 Key: FLINK-5725
 URL: https://issues.apache.org/jira/browse/FLINK-5725
 Project: Flink
  Issue Type: New Feature
Reporter: Haohui Mai
Assignee: Haohui Mai


As described in the title.

This jira proposes to support joining two streaming tables in the SQL API.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-02-06 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854818#comment-15854818
 ] 

Haohui Mai commented on FLINK-5624:
---

Thanks for the explanation.

I took a closer look at the built-in function route. Recognizing the function 
is relatively straightforward. The problem is that it is difficult for the 
{{rowtime()}} function to refer to the corresponding {{DataStream}} at the code 
generation phase. Probably it might be easier to add some system / virtual 
columns instead.

> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5718) Handle JVM Fatal Exceptions in Tasks

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854677#comment-15854677
 ] 

ASF GitHub Bot commented on FLINK-5718:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3276#discussion_r99674991
  
--- Diff: docs/setup/config.md ---
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by 
the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 
KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC. **Note:** For streaming setups, we highly 
recommend to set this value to `false` as the core state backends currently do 
not use the managed memory.
--- End diff --

Should this warning also be added to `flink-conf.yaml`?


> Handle JVM Fatal Exceptions in Tasks
> 
>
> Key: FLINK-5718
> URL: https://issues.apache.org/jira/browse/FLINK-5718
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> The TaskManager catches and handles all types of exceptions right now (all 
> {{Throwables}}). The intention behind that is:
>   - Many {{Error}} subclasses are recoverable for the TaskManagers, such as 
> failure to load/link user code
>   - We want to give eager notifications to the JobManager in case something 
> in a task goes wrong.
> However, there are some exceptions which should probably simply terminate the 
> JVM, if caught in the task thread, because they may leave the JVM in a 
> dysfunctional limbo state:
>   - {{OutOfMemoryError}}
>   - {{InternalError}}
>   - {{UnknownError}}
>   - {{ZipError}}
> These are basically the subclasses of {{VirtualMachineError}}, except for 
> {{StackOverflowError}}, which is recoverable and usually recovered already by 
> the time the exception has been thrown and the stack unwound.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3276: [FLINK-5718] [core] TaskManagers exit the JVM on f...

2017-02-06 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/3276#discussion_r99674991
  
--- Diff: docs/setup/config.md ---
@@ -86,7 +86,7 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.segment-size`: The size of memory buffers used by 
the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 
KiBytes)).
 
-- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC.
+- `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false). When `taskmanager.memory.off-heap` is set to 
`true`, then it is advised that this configuration is also set to `true`.  If 
this configuration is set to `false` cleaning up of the allocated offheap 
memory happens only when the configured JVM parameter MaxDirectMemorySize is 
reached by triggering a full GC. **Note:** For streaming setups, we highly 
recommend to set this value to `false` as the core state backends currently do 
not use the managed memory.
--- End diff --

Should this warning also be added to `flink-conf.yaml`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3280: [Flink-5724] Error in documentation zipping elemen...

2017-02-06 Thread Fokko
GitHub user Fokko opened a pull request:

https://github.com/apache/flink/pull/3280

[Flink-5724] Error in documentation zipping elements

Because the Scala tab is defined twice, it is not possible to open
the Python tab.

Please look at the documentation page itself:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/zip_elements_guide.html

I believe it is a copy-paste error (which also happens to me too often ;)


Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Fokko/flink fd-fix-docs-zipping-elements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3280


commit f490fbcd3633f77e87d89ba397c2b1ed1a030543
Author: Fokko Driesprong 
Date:   2017-02-06T20:08:31Z

Error in documentation zipping elements

Because the Scala tab is defined twice, it is not possible to open
the Python tab.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3163) Configure Flink for NUMA systems

2017-02-06 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3163:
--
Fix Version/s: 1.3.0

> Configure Flink for NUMA systems
> 
>
> Key: FLINK-3163
> URL: https://issues.apache.org/jira/browse/FLINK-3163
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> On NUMA systems Flink can be pinned to a single physical processor ("node") 
> using {{numactl --membind=$node --cpunodebind=$node }}. Commonly 
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could 
> configure a single TaskManager with 36 slots or have Flink create two 
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between 
> TaskManagers on the same system, though the fraction of data shuffled in this 
> manner decreases with the size of the cluster. The performance improvement 
> from only accessing local memory looks to be significant though difficult to 
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-2908) Web interface redraw web plan when browser resized

2017-02-06 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-2908:
--
Fix Version/s: 1.3.0

> Web interface redraw web plan when browser resized
> --
>
> Key: FLINK-2908
> URL: https://issues.apache.org/jira/browse/FLINK-2908
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.3.0
>
>
> The job plan graph does not resize when the user expands the browser window 
> (only a change in width matters).
> To reproduce: 1) open the plan tab of a running or completed job in a 
> non-maximized browser window (not full width), 2) maximize the browser window.
> Workaround: refresh the web page.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5724) Error in the 'Zipping Elements' docs

2017-02-06 Thread Fokko Driesprong (JIRA)
Fokko Driesprong created FLINK-5724:
---

 Summary: Error in the 'Zipping Elements' docs
 Key: FLINK-5724
 URL: https://issues.apache.org/jira/browse/FLINK-5724
 Project: Flink
  Issue Type: Bug
Reporter: Fokko Driesprong


The tab for the Python documentation isn't working because there are two tabs 
pointing at the Scala example.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-2883) Add documentation to forbid key-modifying ReduceFunction

2017-02-06 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-2883.
-
Resolution: Implemented

1.2: 37d6f8196f12714491c95adb73319bb36e7a0d34
master: 11ebf484280314231d146dcfb0b973934448f00b

> Add documentation to forbid key-modifying ReduceFunction
> 
>
> Key: FLINK-2883
> URL: https://issues.apache.org/jira/browse/FLINK-2883
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API, Documentation
>Affects Versions: 0.10.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
> Fix For: 1.3.0, 1.2.1
>
>
> If one uses a combinable reduce operation which also changes the key value of 
> the underlying data element, then the results of the reduce operation can 
> become wrong. The reason is that after the combine phase, another reduce 
> operator is executed which will then reduce the elements based on the new key 
> values. This might be not so surprising if one explicitly defined ones 
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
> conceals the fact that a combiner is used implicitly. Furthermore, the API 
> does not prevent the user from changing the key fields which could solve the 
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
> (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity 
> is not sufficient for a combinable reduce operation. Additionally we also 
> need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5680) Document env.ssh.opts

2017-02-06 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-5680.
-
Resolution: Implemented

1.2: 57cd0713183b4a2a2463736f672e1cd74cd913d6
master: 327ff060a7b64862aa6a33b6c8b07bbef05bde76

> Document env.ssh.opts
> -
>
> Key: FLINK-5680
> URL: https://issues.apache.org/jira/browse/FLINK-5680
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> Document {{env.ssh.opts}} in {{setup/config.html}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5680) Document env.ssh.opts

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854636#comment-15854636
 ] 

ASF GitHub Bot commented on FLINK-5680:
---

Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/3247


> Document env.ssh.opts
> -
>
> Key: FLINK-5680
> URL: https://issues.apache.org/jira/browse/FLINK-5680
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.3.0, 1.2.1
>
>
> Document {{env.ssh.opts}} in {{setup/config.html}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-2883) Add documentation to forbid key-modifying ReduceFunction

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854637#comment-15854637
 ] 

ASF GitHub Bot commented on FLINK-2883:
---

Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/3256


> Add documentation to forbid key-modifying ReduceFunction
> 
>
> Key: FLINK-2883
> URL: https://issues.apache.org/jira/browse/FLINK-2883
> Project: Flink
>  Issue Type: Task
>  Components: DataStream API, Documentation
>Affects Versions: 0.10.0
>Reporter: Till Rohrmann
>Assignee: Greg Hogan
> Fix For: 1.3.0, 1.2.1
>
>
> If one uses a combinable reduce operation which also changes the key value of 
> the underlying data element, then the results of the reduce operation can 
> become wrong. The reason is that after the combine phase, another reduce 
> operator is executed which will then reduce the elements based on the new key 
> values. This might be not so surprising if one explicitly defined ones 
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
> conceals the fact that a combiner is used implicitly. Furthermore, the API 
> does not prevent the user from changing the key fields which could solve the 
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
> (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity 
> is not sufficient for a combinable reduce operation. Additionally we also 
> need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3256: [FLINK-2883] [docs] Add documentation to forbid ke...

2017-02-06 Thread greghogan
Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/3256


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3247: [FLINK-5680] [docs] Document env.ssh.opts

2017-02-06 Thread greghogan
Github user greghogan closed the pull request at:

https://github.com/apache/flink/pull/3247


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5723) Use "Used" instead of "Initial" to make taskmanager tag more readable

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854572#comment-15854572
 ] 

ASF GitHub Bot commented on FLINK-5723:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3274
  
you have to modify the corresponding jade files; modifications to the html 
files will be overridden the next time someone builds the web-ui.


> Use "Used" instead of "Initial" to make taskmanager tag more readable
> -
>
> Key: FLINK-5723
> URL: https://issues.apache.org/jira/browse/FLINK-5723
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Tao Wang
>Priority: Trivial
>
> Now in JobManager web fronted, the used memory of task managers is presented 
> as "Initial" in table header, which actually means "memory used", from codes.
> I'd like change it to be more readable, even it is trivial one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3274: [FLINK-5723][UI]Use Used instead of Initial to make taskm...

2017-02-06 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3274
  
you have to modify the corresponding jade files; modifications to the html 
files will be overridden the next time someone builds the web-ui.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5376) Misleading log statements in UnorderedStreamElementQueue

2017-02-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5376:
--
Description: 
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}
I guess OrderedStreamElementQueue was coded first.

  was:
The following are two examples where ordered stream element queue is mentioned:
{code}
LOG.debug("Put element into ordered stream element queue. New filling 
degree " +
  "({}/{}).", numberEntries, capacity);

return true;
  } else {
LOG.debug("Failed to put element into ordered stream element queue 
because it " +
{code}

I guess OrderedStreamElementQueue was coded first.


> Misleading log statements in UnorderedStreamElementQueue
> 
>
> Key: FLINK-5376
> URL: https://issues.apache.org/jira/browse/FLINK-5376
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Priority: Minor
>
> The following are two examples where ordered stream element queue is 
> mentioned:
> {code}
> LOG.debug("Put element into ordered stream element queue. New filling 
> degree " +
>   "({}/{}).", numberEntries, capacity);
> return true;
>   } else {
> LOG.debug("Failed to put element into ordered stream element queue 
> because it " +
> {code}
> I guess OrderedStreamElementQueue was coded first.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-02-06 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15822346#comment-15822346
 ] 

Ted Yu edited comment on FLINK-5486 at 2/6/17 6:20 PM:
---

Lock on State.bucketStates should be held in the following method:
{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}


was (Author: yuzhih...@gmail.com):
Lock on State.bucketStates should be held in the following method:

{code}
  private void handleRestoredBucketState(State restoredState) {
Preconditions.checkNotNull(restoredState);

for (BucketState bucketState : restoredState.bucketStates.values()) {
{code}

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5488) yarnClient should be closed in AbstractYarnClusterDescriptor for error conditions

2017-02-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-5488:
--
Description: 
Here is one example:
{code}
if(jobManagerMemoryMb > maxRes.getMemory() ) {
  failSessionDuringDeployment(yarnClient, yarnApplication);
  throw new YarnDeploymentException("The cluster does not have the 
requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + 
jobManagerMemoryMb + "MB. " + NOTE);
}
{code}

yarnClient implements Closeable.
It should be closed in situations where exception is thrown.

  was:
Here is one example:
{code}
if(jobManagerMemoryMb > maxRes.getMemory() ) {
  failSessionDuringDeployment(yarnClient, yarnApplication);
  throw new YarnDeploymentException("The cluster does not have the 
requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + 
jobManagerMemoryMb + "MB. " + NOTE);
}
{code}
yarnClient implements Closeable.
It should be closed in situations where exception is thrown.


> yarnClient should be closed in AbstractYarnClusterDescriptor for error 
> conditions
> -
>
> Key: FLINK-5488
> URL: https://issues.apache.org/jira/browse/FLINK-5488
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Reporter: Ted Yu
>
> Here is one example:
> {code}
> if(jobManagerMemoryMb > maxRes.getMemory() ) {
>   failSessionDuringDeployment(yarnClient, yarnApplication);
>   throw new YarnDeploymentException("The cluster does not have the 
> requested resources for the JobManager available!\n"
> + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + 
> jobManagerMemoryMb + "MB. " + NOTE);
> }
> {code}
> yarnClient implements Closeable.
> It should be closed in situations where exception is thrown.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-3163) Configure Flink for NUMA systems

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854437#comment-15854437
 ] 

ASF GitHub Bot commented on FLINK-3163:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3249
  
@StephanEwen from the discussion of FLINK-3163 I also had the idea of 
`taskmanager.compute.fraction` where the number of slots would be a multiple of 
the number of cores / vcores. Since Flink processes these as opaque strings the 
only purpose is to help organize [config 
page](https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html).

I have found YARN-5764, MESOS-5342, and MESOS-314 discussing NUMA support 
for containers but all are works in progress. I see that Docker supports 
`--cpuset-cpus` and `--cpuset-mems` in [docker 
run](https://docs.docker.com/engine/reference/run/) and in [docker 
compose](https://docs.docker.com/compose/compose-file) config version 2 (using 
`cpuset`). It's not clear how to dynamically bind Flink to numa nodes without 
scripting Flink's docker commands.


> Configure Flink for NUMA systems
> 
>
> Key: FLINK-3163
> URL: https://issues.apache.org/jira/browse/FLINK-3163
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> On NUMA systems Flink can be pinned to a single physical processor ("node") 
> using {{numactl --membind=$node --cpunodebind=$node }}. Commonly 
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could 
> configure a single TaskManager with 36 slots or have Flink create two 
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between 
> TaskManagers on the same system, though the fraction of data shuffled in this 
> manner decreases with the size of the cluster. The performance improvement 
> from only accessing local memory looks to be significant though difficult to 
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3249: [FLINK-3163] [scripts] Configure Flink for NUMA systems

2017-02-06 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/3249
  
@StephanEwen from the discussion of FLINK-3163 I also had the idea of 
`taskmanager.compute.fraction` where the number of slots would be a multiple of 
the number of cores / vcores. Since Flink processes these as opaque strings the 
only purpose is to help organize [config 
page](https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html).

I have found YARN-5764, MESOS-5342, and MESOS-314 discussing NUMA support 
for containers but all are works in progress. I see that Docker supports 
`--cpuset-cpus` and `--cpuset-mems` in [docker 
run](https://docs.docker.com/engine/reference/run/) and in [docker 
compose](https://docs.docker.com/compose/compose-file) config version 2 (using 
`cpuset`). It's not clear how to dynamically bind Flink to numa nodes without 
scripting Flink's docker commands.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3279: [FLINK-5618][docs] createSerializer must actually ...

2017-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3279


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5618) Add queryable state documentation

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854417#comment-15854417
 ] 

ASF GitHub Bot commented on FLINK-5618:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3279


> Add queryable state documentation
> -
>
> Key: FLINK-5618
> URL: https://issues.apache.org/jira/browse/FLINK-5618
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ufuk Celebi
> Fix For: 1.2.0, 1.3.0
>
>
> Adds docs about how to use queryable state usage.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3278: [FLINK-5701] [kafka] FlinkKafkaProducer should check asyn...

2017-02-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3278
  
@tillrohrmann 
Yes, in the condition that you described, then at-least-once doesn't hold. 
I said _might_ mainly considering there is chance that for every checkpoint 
barrier, the previous events (i.e. `event1` in your example) has been 
committed. But yes, essentially this indeterminate behaviour means that it is 
not at-least-once, so I'm incorrect in saying that it _might_.

I was trying to point out that this indeterminate behaviour made it hard to 
have a stable test for `testAtLeastOnceProducerFailsIfFlushingDisabled()` 
without relying on sleeping, which ultimately leads to flaky tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5701) FlinkKafkaProducer should check asyncException on checkpoints

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854409#comment-15854409
 ] 

ASF GitHub Bot commented on FLINK-5701:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3278
  
@tillrohrmann 
Yes, in the condition that you described, then at-least-once doesn't hold. 
I said _might_ mainly considering there is chance that for every checkpoint 
barrier, the previous events (i.e. `event1` in your example) has been 
committed. But yes, essentially this indeterminate behaviour means that it is 
not at-least-once, so I'm incorrect in saying that it _might_.

I was trying to point out that this indeterminate behaviour made it hard to 
have a stable test for `testAtLeastOnceProducerFailsIfFlushingDisabled()` 
without relying on sleeping, which ultimately leads to flaky tests.


> FlinkKafkaProducer should check asyncException on checkpoints
> -
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5564) User Defined Aggregates

2017-02-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854402#comment-15854402
 ] 

Fabian Hueske edited comment on FLINK-5564 at 2/6/17 5:19 PM:
--

Hi [~Shaoxuan Wang], I think it should be possible to split the first three 
steps as follows:

1. Add the new UDAGG interface and migrate existing aggregation functions to it.
We keep the current implementation and just add code and unit tests for the 
implementations of the aggregation functions that implement the new interface.
2. Use the new aggregation function for batch tables.
We switch the implementation for the DataSet runtime code. We still keep the 
old aggregation functions for the streaming code.
3. Use the new aggregation function for streaming tables.
We switch the implementation for the DataStream runtime code. In this step we 
remove the old aggregation functions and clean up.

Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be 
hard to review. I'd prefer several smaller steps with well defined scope.

Thanks, Fabian



was (Author: fhueske):
Hi [~Shaoxuan Wang], I think it should be possible to split the first three 
steps as follows:

1. Add the new UDAGG interface and migrate existing aggregation functions to it.
We keep the current implementation and just add code and unit tests for the 
implementations of the aggregation functions that implement the new interface.
2. Use the new aggregation function for batch tables.
We switch the implementation for the DataSet runtime code. We still keep the 
old aggregation functions for the streaming code.
3. Use the new aggregation function for streaming tables.
We switch the implementation for the DataStream runtime code. In this step we 
remove the old aggregation functions and clean up.

Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be 
hard to review. I'd prefer several smaller steps with well defined scope.

Regarding the discussion of the window OVER functions. It would be great if you 
could post your comment (with which I agree) to the corresponding JIRA issue 
and the discussion on the dev list otherwise it might not be noticed.

Thanks, Fabian


> User Defined Aggregates
> ---
>
> Key: FLINK-5564
> URL: https://issues.apache.org/jira/browse/FLINK-5564
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> User-defined aggregates would be a great addition to the Table API / SQL.
> The current aggregate interface is not well suited for the external users.  
> This issue proposes to redesign the aggregate such that we can expose an 
> better external UDAGG interface to the users. The detailed design proposal 
> can be found here: 
> https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit
> Motivation:
> 1. The current aggregate interface is not very concise to the users. One 
> needs to know the design details of the intermediate Row buffer before 
> implements an Aggregate. Seven functions are needed even for a simple Count 
> aggregate.
> 2. Another limitation of current aggregate function is that it can only be 
> applied on one single column. There are many scenarios which require the 
> aggregate function taking multiple columns as the inputs.
> 3. “Retraction” is not considered and covered in the current Aggregate.
> 4. It might be very good to have a local/global aggregate query plan 
> optimization, which is very promising to optimize UDAGG performance in some 
> scenarios.
> Proposed Changes:
> 1. Implement an aggregate dataStream API (Done by 
> [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582])
> 2. Update all the existing aggregates to use the new aggregate dataStream API
> 3. Provide a better User-Defined Aggregate interface
> 4. Add retraction support
> 5. Add local/global aggregate



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5564) User Defined Aggregates

2017-02-06 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854402#comment-15854402
 ] 

Fabian Hueske commented on FLINK-5564:
--

Hi [~Shaoxuan Wang], I think it should be possible to split the first three 
steps as follows:

1. Add the new UDAGG interface and migrate existing aggregation functions to it.
We keep the current implementation and just add code and unit tests for the 
implementations of the aggregation functions that implement the new interface.
2. Use the new aggregation function for batch tables.
We switch the implementation for the DataSet runtime code. We still keep the 
old aggregation functions for the streaming code.
3. Use the new aggregation function for streaming tables.
We switch the implementation for the DataStream runtime code. In this step we 
remove the old aggregation functions and clean up.

Adressing 1, 2, and 3 in a single issue will result in a huge PR which will be 
hard to review. I'd prefer several smaller steps with well defined scope.

Regarding the discussion of the window OVER functions. It would be great if you 
could post your comment (with which I agree) to the corresponding JIRA issue 
and the discussion on the dev list otherwise it might not be noticed.

Thanks, Fabian


> User Defined Aggregates
> ---
>
> Key: FLINK-5564
> URL: https://issues.apache.org/jira/browse/FLINK-5564
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> User-defined aggregates would be a great addition to the Table API / SQL.
> The current aggregate interface is not well suited for the external users.  
> This issue proposes to redesign the aggregate such that we can expose an 
> better external UDAGG interface to the users. The detailed design proposal 
> can be found here: 
> https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit
> Motivation:
> 1. The current aggregate interface is not very concise to the users. One 
> needs to know the design details of the intermediate Row buffer before 
> implements an Aggregate. Seven functions are needed even for a simple Count 
> aggregate.
> 2. Another limitation of current aggregate function is that it can only be 
> applied on one single column. There are many scenarios which require the 
> aggregate function taking multiple columns as the inputs.
> 3. “Retraction” is not considered and covered in the current Aggregate.
> 4. It might be very good to have a local/global aggregate query plan 
> optimization, which is very promising to optimize UDAGG performance in some 
> scenarios.
> Proposed Changes:
> 1. Implement an aggregate dataStream API (Done by 
> [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582])
> 2. Update all the existing aggregates to use the new aggregate dataStream API
> 3. Provide a better User-Defined Aggregate interface
> 4. Add retraction support
> 5. Add local/global aggregate



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854396#comment-15854396
 ] 

ASF GitHub Bot commented on FLINK-5575:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3242


> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
> Fix For: 1.0.4, 1.2.0, 1.3.0, 1.1.5
>
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3242: [FLINK-5575][docs] reworked the mechanism that war...

2017-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3242


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5575.
--
   Resolution: Fixed
Fix Version/s: 1.2.0
   1.1.5
   1.3.0
   1.0.4

Fixed in dd7311e (master), 6986f49 (release-1.2), 3cd7c8e (release-1.1), 
63660ca (release-1.0), 43f8261 (release-0.10), 3ea8ae3 (release-0.9), f6f823e 
(release-0.8).

> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
> Fix For: 1.0.4, 1.3.0, 1.1.5, 1.2.0
>
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3240: [FLINK-5575][docs] in old releases, warn users and guide ...

2017-02-06 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3240
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854384#comment-15854384
 ] 

ASF GitHub Bot commented on FLINK-5575:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3239
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854383#comment-15854383
 ] 

ASF GitHub Bot commented on FLINK-5575:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3240
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3239: [FLINK-5575][docs] in old releases, warn users and guide ...

2017-02-06 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3239
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854382#comment-15854382
 ] 

ASF GitHub Bot commented on FLINK-5575:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3241
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5575) in old releases, warn users and guide them to the latest stable docs

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854385#comment-15854385
 ] 

ASF GitHub Bot commented on FLINK-5575:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3238
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


> in old releases, warn users and guide them to the latest stable docs
> 
>
> Key: FLINK-5575
> URL: https://issues.apache.org/jira/browse/FLINK-5575
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>
> Old versions of Flink (especially version 0.8) are being frequently studied, 
> downloaded, and used by new users (because google leads them there). I 
> propose to guide folks to the latest stable release by adding a link on every 
> documentation page in the old docs that links to the home page of the latest 
> stable docs. 
> The redirect lives at flink.apache.org/q/stable-docs.html, and will need to 
> be modified with each major release (e.g. when 1.2 is released).
> This problem affects all releases before 1.1, but the stats show that 0.8, 
> 0.9, 0.10, and 1.0 are the most important to deal with.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3241: [FLINK-5575][docs] in old releases, warn users and guide ...

2017-02-06 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3241
  
Could you please close this PR manually? Bot closing only works for the 
master branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >