[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190530647
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -240,7 +249,9 @@ public void run() {
newPartitions = 
unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
-   
reassignPartitions(newPartitions);
+   
reassignPartitions(newPartitions, new HashSet<>());
--- End diff --

I just realized this should be actually
`reassignPartitions(newPartitions, partitionsToBeRemoved);`


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190529981
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List<KafkaTopicPartitionState> 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List<KafkaTopicPartitionState> 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I though about it, but my only concern is the case where we'd have both, 
partitions to add and partitions to remove...  
the `consumerCallBridge.assignPartitions()` takes the whole new list of 
partitions, so in that case, we would need to wait for the first assignment 
(e.g. add new partitions) before doing the second assignment (e.g. remove 
partitions) in order to have a consistent list of partitions. 
I think we would try to have only one call to 
`consumerCallBridge.assignPartitions()`.

Maybe I could refactor the part where partitions are removed from old 
partitions to a separate private method like `removeFromOldPartitions()` ?

What do you think ?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190519624
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

I did it in that way only because this is something new, so I though that 
maybe you may want it to be configurable. But you are right I cannot think of a 
case we would prefer to keep the unavailable partitions.
I'll update the PR to make it the default behaviour if it's ok for you.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-24 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190518142
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue<KafkaTopicPartitionState> 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

From my understanding, for unassigned partitions we can use a Queue because 
it does not matter which consumer will take the new partitions.
But we can not use a Queue for partitions to be removed because we only can 
remove the partitions from the consumer that is actually subscribed to that 
partition.
Does that make sense ?


---


[GitHub] flink issue #5991: [FLINK-9303] [kafka] Adding support for unassign dynamica...

2018-05-22 Thread EAlexRojas
Github user EAlexRojas commented on the issue:

https://github.com/apache/flink/pull/5991
  
PR updated taking into account comments


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-16 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188596584
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +82,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue<KafkaTopicPartitionState> 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final List partitionsToBeRemoved;
--- End diff --

You are right, a Set should be better for all the calls to the `contains()` 
method. 


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-16 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188564095
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),

KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
-   !getBoolean(props, KEY_DISABLE_METRICS, false));
+   !getBoolean(props, KEY_DISABLE_METRICS, false),
+   getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, 
false));
--- End diff --

You're right, I'll change it


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-11 Thread EAlexRojas
GitHub user EAlexRojas opened a pull request:

https://github.com/apache/flink/pull/5991

[FLINK-9303] [kafka] Adding support for unassign dynamically partitions 
from kafka consumer when they become unavailable

## What is the purpose of the change

This pull request add an option on the kafka consumer to check for 
unavailable partitions and unassign them from the consumer. That way the 
consumer does not request for records on invalid partitions and prevent Logs 
noises.

## Brief change log

- Modify the partition discovery system to check not only new partitions, 
but also check partitions that are no longer available.
- Check for partitions no longer available recovered from state.
- Add option on kafka consumer to activate this checks

## Verifying this change

This change added tests and can be verified as follows:
*Manually verified as follows:*
- Create a job with a kafka consumer listening to a topic pattern and 
having partition discovery activated and the property introduced in this PR set 
to true.
- Configure Kafka to have set the following properties: 
   delete.topic.enable=true
   auto.create.topics.enable=false
- Create some topics matching the pattern.
- Run the job.
-  While running, remove some of the topics. 
- Verify the partitions are unassigned and the job continue running without 
Log noises.

*I guess this can be tested with e2e tests, but I'm not familiarised with 
the system in place* 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EAlexRojas/flink kafka-unassign-partitions-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5991.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5991


commit a17d0dcdeaac5b2508f4748d08fd4cb879fa5033
Author: EAlexRojas <alexrojas235@...>
Date:   2018-04-18T14:35:57Z

[FLINK-9303] [kafka] Adding support for unassign dynamically partitions 
from kafka consumer when they become unavailable
- Check for unavailable partitions recovered from state
- Using kafka consumer option to activate this validations




---


[GitHub] flink pull request #5789: [FLINK-9103] Using CanonicalHostName instead of IP...

2018-03-29 Thread EAlexRojas
GitHub user EAlexRojas opened a pull request:

https://github.com/apache/flink/pull/5789

[FLINK-9103] Using CanonicalHostName instead of IP for SSL connection on 
NettyClient

## What is the purpose of the change

This pull request makes the NettyClient use the CanonicalHostName instead 
of the IP address for SSL communication. That way dynamic environments like 
kubernetes can be fully supported as certificates with wildcard DNS can be used.


## Brief change log

- Use CanonicalHostName instead of HostNameAddress to identify the server 
on the NettyClient


## Verifying this change

This change is already covered by existing tests, such as:

NettyClientServerSslTest (org.apache.flink.runtime.io.network.netty)
   - testValidSslConnection
   - testSslHandshakeError 

Also manually verified the change by running a 4 node kubernetes cluster 
with 1 JobManagers and 3 TaskManagers, using wildcard DNS certificates and 
executing a stateful streaming program with parallelism set to 2 and verifying 
that all nodes are able to communicate to each other successfully. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):  no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EAlexRojas/flink release-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5789


commit 202672da7901fe7df912e6a057d6d0c29ccaf0fd
Author: EAlexRojas <alexrojas235@...>
Date:   2018-03-29T14:01:24Z

Using CanonicalHostName instead of IP for SSL coonection on NettyClient




---