[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-08 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4955


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149326628
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

another nitpick 😉: `` tags are usually not closed in Javadoc: 
http://www.oracle.com/technetwork/articles/java/index-137868.html


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149322930
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

ops, forgot about it


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149316761
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List> idsToAbort = new ArrayList<>();
+   List> idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
+   }
+   }
+   }
+
+   private  void assertIntersectionIsEmpty(Set first, Set second) 
{
--- End diff --

Maybe `assertDisjoint`


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149312052
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List> idsToAbort = new ArrayList<>();
+   List> idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

what about 
`assertIntersectionIsEmpty(idsToUse.get(subtask2), 
idsToAbort.get(subtask1));`?


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315581
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List> idsToAbort = new ArrayList<>();
+   List> idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

ah you're right, thanks for the explanation :)


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315173
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TransactionalIdsGenerator}.
+ */
+public class TransactionalIdsGeneratorTest {
+   private static final int POOL_SIZE = 3;
+   private static final int SAFE_SCALE_DOWN_FACTOR = 3;
+   private static final int SUBTASKS_COUNT = 5;
+
+   @Test
+   public void testGenerateIdsToUse() {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+
+   assertEquals(
+   new HashSet<>(Arrays.asList("test-42", "test-43", 
"test-44")),
+   generator.generateIdsToUse(36));
+   }
+
+   /**
+* Ids to abort and to use should never clash between subtasks.
+*/
+   @Test
+   public void testGeneratedIdsDoNotClash() {
+   List> idsToAbort = new ArrayList<>();
+   List> idsToUse = new ArrayList<>();
+
+   for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) {
+   TransactionalIdsGenerator generator = new 
TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, 
SAFE_SCALE_DOWN_FACTOR);
+   idsToUse.add(generator.generateIdsToUse(0));
+   idsToAbort.add(generator.generateIdsToAbort());
+   }
+
+   for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
+   for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; 
subtask2++) {
+   if (subtask2 == subtask1) {
+   continue;
+   }
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1));
+   
assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1));
+   
assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1));
--- End diff --

that will be covered by second half of looping (everytime I start from 
index 0). It would be necessary if the loop would look like:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) {
```
instead of as it is now:
```
for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) {
for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) {
```


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149315106
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
+ */
+public class TransactionalIdsGenerator {
--- End diff --

It is better to use the unordered list tag, e.g.,
```
  * 
  * generated ids to use will never clash with ids to use from 
different subtasks
  * generated ids to abort will never clash with ids to abort 
from different subtasks
  * generated ids to use will never clash with ids to abort 
from different subtasks
  * 
```
Otherwise it is rendered like this:

![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png)



---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4955#discussion_r149310451
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java
 ---
@@ -19,27 +19,35 @@
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Class responsible for generating transactional ids to use when 
communicating with Kafka.
+ *
+ * It guarantees that:
+ * - generated ids to use will never clash with ids to use from different 
subtasks
+ * - generated ids to abort will never clash with ids to abort from 
different subtasks
+ * - generated ids to use will never clash with ids to abort from 
different subtasks
+ *
+ * In other words, any particular generated id will always be assigned 
to one and only one subtask.
  */
 public class TransactionalIdsGenerator {
private final String prefix;
private final int subtaskIndex;
+   private final int totalNumberOfSubtasks;
private final int poolSize;
private final int safeScaleDownFactor;
 
public TransactionalIdsGenerator(
String prefix,
int subtaskIndex,
+   int totalNumberOfSubtasks,
int poolSize,
int safeScaleDownFactor) {
this.prefix = checkNotNull(prefix);
this.subtaskIndex = subtaskIndex;
+   this.totalNumberOfSubtasks = totalNumberOfSubtasks;
--- End diff --

Maybe we should add some argument checks for subtask index and 
totalNumberOfSubtasks, at least.


---


[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...

2017-11-06 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4955

[FLINK-7978][kafka] Ensure that transactional ids will never clash

## What is the purpose of the change

Previously transactional ids to use and to abort could clash between 
subtasks. This could lead to a race condition between initialization and 
writting the data, where one subtask is still initializing/aborting some 
transactional id while different subtask is already trying to write the data 
using the same transactional id.

## Brief change log

First commit extracts `TransactionalIdsGenerator` logic and is a pure 
refactor, without any functional change. Second one is the actual fix. I would 
like to merge those two commits as they are, without squashing, so that a bug 
fix is actually easier to understand/read in the commit history.

## Verifying this change

This fixes a bug and adds test coverage (`TransactionalIdsGeneratorTest`) 
for future regressions.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7978

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4955


commit 54c1729fed93e51f416d365c65cbd64f67c27321
Author: Piotr Nowojski 
Date:   2017-11-06T13:03:16Z

[hotfix][kafka] Extract TransactionalIdsGenerator class from 
FlinkKafkaProducer011

This is pure refactor without any functional changes.

commit 96bb26e5d9289fb8393f0807936792d89138a744
Author: Piotr Nowojski 
Date:   2017-11-06T13:14:01Z

[FLINK-7978][kafka] Ensure that transactional ids will never clash

Previously transactional ids to use and to abort could clash between
subtasks. This could lead to a race condition between initialization
and writting the data, where one subtask is still initializing/aborting
some transactional id while different subtask is already trying to write
the data using the same transactional id.




---