[hotfix] [kafka-tests] Clean up FlinkKafkaProducer011Tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d3589e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d3589e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d3589e Branch: refs/heads/master Commit: b7d3589e7732b6458ac4c0ad936666d670cac87b Parents: 8cdf2ff Author: gyao <[email protected]> Authored: Thu Oct 26 19:25:35 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Nov 2 12:32:28 2017 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011Tests.java | 49 +------------------- 1 file changed, 1 insertion(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7d3589e/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java index 69c3ceb..381ba33 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; @@ -36,13 +35,10 @@ import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -68,7 +64,7 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase { protected TypeInformationSerializationSchema<Integer> integerSerializationSchema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema = - new KeyedSerializationSchemaWrapper(integerSerializationSchema); + new KeyedSerializationSchemaWrapper<>(integerSerializationSchema); @Before public void before() { @@ -83,49 +79,6 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase { extraProperties.put("isolation.level", "read_committed"); } - @Test(timeout = 30000L) - public void testHappyPath() throws IOException { - String topicName = "flink-kafka-producer-happy-path"; - try (Producer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { - kafkaProducer.initTransactions(); - kafkaProducer.beginTransaction(); - kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); - kafkaProducer.commitTransaction(); - } - assertRecord(topicName, "42", "42"); - deleteTestTopic(topicName); - } - - @Test(timeout = 30000L) - public void testResumeTransaction() throws IOException { - String topicName = "flink-kafka-producer-resume-transaction"; - try (FlinkKafkaProducer<String, String> kafkaProducer = new FlinkKafkaProducer<>(extraProperties)) { - kafkaProducer.initTransactions(); - kafkaProducer.beginTransaction(); - kafkaProducer.send(new ProducerRecord<>(topicName, "42", "42")); - kafkaProducer.flush(); - long producerId = kafkaProducer.getProducerId(); - short epoch = kafkaProducer.getEpoch(); - - try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { - resumeProducer.resumeTransaction(producerId, epoch); - resumeProducer.commitTransaction(); - } - - assertRecord(topicName, "42", "42"); - - // this shouldn't throw - in case of network split, old producer might attempt to commit it's transaction - kafkaProducer.commitTransaction(); - - // this shouldn't fail also, for same reason as above - try (FlinkKafkaProducer<String, String> resumeProducer = new FlinkKafkaProducer<>(extraProperties)) { - resumeProducer.resumeTransaction(producerId, epoch); - resumeProducer.commitTransaction(); - } - } - deleteTestTopic(topicName); - } - @Test(timeout = 120_000L) public void testFlinkKafkaProducer011FailBeforeNotify() throws Exception { String topic = "flink-kafka-producer-fail-before-notify";
