[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4955 ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149326628 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- another nitpick ð: `` tags are usually not closed in Javadoc: http://www.oracle.com/technetwork/articles/java/index-137868.html ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149322930 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- ops, forgot about it ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149316761 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + List> idsToAbort = new ArrayList<>(); + List> idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); + } + } + } + + private void assertIntersectionIsEmpty(Set first, Set second) { --- End diff -- Maybe `assertDisjoint` ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149312052 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + List> idsToAbort = new ArrayList<>(); + List> idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- what about `assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToAbort.get(subtask1));`? ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315581 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + List> idsToAbort = new ArrayList<>(); + List> idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- ah you're right, thanks for the explanation :) ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315173 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGeneratorTest.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link TransactionalIdsGenerator}. + */ +public class TransactionalIdsGeneratorTest { + private static final int POOL_SIZE = 3; + private static final int SAFE_SCALE_DOWN_FACTOR = 3; + private static final int SUBTASKS_COUNT = 5; + + @Test + public void testGenerateIdsToUse() { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", 2, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + + assertEquals( + new HashSet<>(Arrays.asList("test-42", "test-43", "test-44")), + generator.generateIdsToUse(36)); + } + + /** +* Ids to abort and to use should never clash between subtasks. +*/ + @Test + public void testGeneratedIdsDoNotClash() { + List> idsToAbort = new ArrayList<>(); + List> idsToUse = new ArrayList<>(); + + for (int subtask = 0; subtask < SUBTASKS_COUNT; subtask++) { + TransactionalIdsGenerator generator = new TransactionalIdsGenerator("test", subtask, SUBTASKS_COUNT, POOL_SIZE, SAFE_SCALE_DOWN_FACTOR); + idsToUse.add(generator.generateIdsToUse(0)); + idsToAbort.add(generator.generateIdsToAbort()); + } + + for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { + for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { + if (subtask2 == subtask1) { + continue; + } + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToAbort.get(subtask1)); + assertIntersectionIsEmpty(idsToUse.get(subtask2), idsToUse.get(subtask1)); + assertIntersectionIsEmpty(idsToAbort.get(subtask2), idsToUse.get(subtask1)); --- End diff -- that will be covered by second half of looping (everytime I start from index 0). It would be necessary if the loop would look like: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = subtask1; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` instead of as it is now: ``` for (int subtask1 = 0; subtask1 < SUBTASKS_COUNT; subtask1++) { for (int subtask2 = 0; subtask2 < SUBTASKS_COUNT; subtask2++) { ``` ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149315106 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. + */ +public class TransactionalIdsGenerator { --- End diff -- It is better to use the unordered list tag, e.g., ``` * * generated ids to use will never clash with ids to use from different subtasks * generated ids to abort will never clash with ids to abort from different subtasks * generated ids to use will never clash with ids to abort from different subtasks * ``` Otherwise it is rendered like this: ![image](https://user-images.githubusercontent.com/1681921/32487045-c97ffe8a-c3a8-11e7-95e8-f3ee127072b9.png) ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4955#discussion_r149310451 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/TransactionalIdsGenerator.java --- @@ -19,27 +19,35 @@ import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.LongStream; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Class responsible for generating transactional ids to use when communicating with Kafka. + * + * It guarantees that: + * - generated ids to use will never clash with ids to use from different subtasks + * - generated ids to abort will never clash with ids to abort from different subtasks + * - generated ids to use will never clash with ids to abort from different subtasks + * + * In other words, any particular generated id will always be assigned to one and only one subtask. */ public class TransactionalIdsGenerator { private final String prefix; private final int subtaskIndex; + private final int totalNumberOfSubtasks; private final int poolSize; private final int safeScaleDownFactor; public TransactionalIdsGenerator( String prefix, int subtaskIndex, + int totalNumberOfSubtasks, int poolSize, int safeScaleDownFactor) { this.prefix = checkNotNull(prefix); this.subtaskIndex = subtaskIndex; + this.totalNumberOfSubtasks = totalNumberOfSubtasks; --- End diff -- Maybe we should add some argument checks for subtask index and totalNumberOfSubtasks, at least. ---
[GitHub] flink pull request #4955: [FLINK-7978][kafka] Ensure that transactional ids ...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4955 [FLINK-7978][kafka] Ensure that transactional ids will never clash ## What is the purpose of the change Previously transactional ids to use and to abort could clash between subtasks. This could lead to a race condition between initialization and writting the data, where one subtask is still initializing/aborting some transactional id while different subtask is already trying to write the data using the same transactional id. ## Brief change log First commit extracts `TransactionalIdsGenerator` logic and is a pure refactor, without any functional change. Second one is the actual fix. I would like to merge those two commits as they are, without squashing, so that a bug fix is actually easier to understand/read in the commit history. ## Verifying this change This fixes a bug and adds test coverage (`TransactionalIdsGeneratorTest`) for future regressions. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7978 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4955 commit 54c1729fed93e51f416d365c65cbd64f67c27321 Author: Piotr Nowojski Date: 2017-11-06T13:03:16Z [hotfix][kafka] Extract TransactionalIdsGenerator class from FlinkKafkaProducer011 This is pure refactor without any functional changes. commit 96bb26e5d9289fb8393f0807936792d89138a744 Author: Piotr Nowojski Date: 2017-11-06T13:14:01Z [FLINK-7978][kafka] Ensure that transactional ids will never clash Previously transactional ids to use and to abort could clash between subtasks. This could lead to a race condition between initialization and writting the data, where one subtask is still initializing/aborting some transactional id while different subtask is already trying to write the data using the same transactional id. ---