[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11a5919 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11a5919 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11a5919 Branch: refs/heads/master Commit: e11a591956ba608308ccf81e13030291f150739b Parents: b7f96f7 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Aug 7 15:53:35 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 8 10:13:02 2017 +0200 ---------------------------------------------------------------------- .../kafka/KafkaTestEnvironmentImpl.java | 33 ++++++------- .../kafka/KafkaTestEnvironmentImpl.java | 19 ++++---- .../kafka/KafkaTestEnvironmentImpl.java | 35 +++++++------- .../kafka/KafkaShortRetentionTestBase.java | 2 +- .../connectors/kafka/KafkaTestBase.java | 2 +- .../connectors/kafka/KafkaTestEnvironment.java | 49 ++++++++++++++++++-- 6 files changed, 88 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index d3b45a9..9f1d379 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; + private Config config; // 6 seconds is default. Seems to be too small for travis. 30 seconds private int zkTimeout = 30000; @@ -96,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { @Override public Properties getSecureProperties() { Properties prop = new Properties(); - if (secureMode) { + if (config.isSecureMode()) { prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.kerberos.service.name", "kafka"); @@ -215,26 +214,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { + public void prepare(Config config) { //increase the timeout since in Travis ZK connection takes long time for secure connection. - if (secureMode) { + if (config.isSecureMode()) { //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; + config.setKafkaServersNumber(1); zkTimeout = zkTimeout * 15; } + this.config = config; - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -249,12 +246,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); - if (secureMode) { + if (config.isSecureMode()) { brokerConnectionString += hostAndPortToUrlString( KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort( @@ -347,7 +344,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { final long deadline = System.nanoTime() + 30_000_000_000L; do { try { - if (secureMode) { + if (config.isSecureMode()) { //increase wait time since in Travis ZK timeout occurs frequently int wait = zkTimeout / 100; LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); @@ -407,8 +404,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; @@ -418,7 +415,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("port", Integer.toString(kafkaPort)); //to support secure kafka cluster - if (secureMode) { + if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index ab976e1..af5ad67 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; + + private Config config; public String getBrokerConnectionString() { return brokerConnectionString; @@ -206,8 +207,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - this.additionalServerProperties = additionalServerProperties; + public void prepare(Config config) { + this.config = config; File tempDir = new File(System.getProperty("java.io.tmpdir")); tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); @@ -224,8 +225,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { fail("cannot create kafka temp dir: " + e.getMessage()); } - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -240,9 +241,9 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { zookeeperConnectionString = zookeeper.getConnectString(); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); SocketServer socketServer = brokers.get(i).socketServer(); @@ -391,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", "30000"); kafkaProperties.put("zookeeper.connection.timeout.ms", "30000"); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index df95420..517f096 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { private String zookeeperConnectionString; private String brokerConnectionString = ""; private Properties standardProps; - private Properties additionalServerProperties; - private boolean secureMode = false; // 6 seconds is default. Seems to be too small for travis. 30 seconds private String zkTimeout = "30000"; + private Config config; + public String getBrokerConnectionString() { return brokerConnectionString; } @@ -200,27 +200,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public void prepare(int numKafkaServers, Properties additionalServerProperties, boolean secureMode) { - + public void prepare(Config config) { //increase the timeout since in Travis ZK connection takes long time for secure connection. - if (secureMode) { + if (config.isSecureMode()) { //run only one kafka server to avoid multiple ZK connections from many instances - Travis timeout - numKafkaServers = 1; + config.setKafkaServersNumber(1); zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) * 15); } + this.config = config; - this.additionalServerProperties = additionalServerProperties; - this.secureMode = secureMode; File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); - tmpKafkaDirs = new ArrayList<>(numKafkaServers); - for (int i = 0; i < numKafkaServers; i++) { + tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); + for (int i = 0; i < config.getKafkaServersNumber(); i++) { File tmpDir = new File(tmpKafkaParent, "server-" + i); assertTrue("cannot create kafka temp dir", tmpDir.mkdir()); tmpKafkaDirs.add(tmpDir); @@ -236,13 +233,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { LOG.info("zookeeperConnectionString: {}", zookeeperConnectionString); LOG.info("Starting KafkaServer"); - brokers = new ArrayList<>(numKafkaServers); + brokers = new ArrayList<>(config.getKafkaServersNumber()); - for (int i = 0; i < numKafkaServers; i++) { + for (int i = 0; i < config.getKafkaServersNumber(); i++) { brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); SocketServer socketServer = brokers.get(i).socketServer(); - if (secureMode) { + if (this.config.isSecureMode()) { brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ","; } else { brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ","; @@ -335,7 +332,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { final long deadline = System.nanoTime() + Integer.parseInt(zkTimeout) * 1_000_000L; do { try { - if (secureMode) { + if (config.isSecureMode()) { //increase wait time since in Travis ZK timeout occurs frequently int wait = Integer.parseInt(zkTimeout) / 100; LOG.info("waiting for {} msecs before the topic {} can be checked", wait, topic); @@ -400,8 +397,8 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { // for CI stability, increase zookeeper session timeout kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout); kafkaProperties.put("zookeeper.connection.timeout.ms", zkTimeout); - if (additionalServerProperties != null) { - kafkaProperties.putAll(additionalServerProperties); + if (config.getKafkaServerProperties() != null) { + kafkaProperties.putAll(config.getKafkaServerProperties()); } final int numTries = 5; @@ -411,7 +408,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { kafkaProperties.put("port", Integer.toString(kafkaPort)); //to support secure kafka cluster - if (secureMode) { + if (config.isSecureMode()) { LOG.info("Adding Kafka secure configurations"); kafkaProperties.put("listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); kafkaProperties.put("advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort); @@ -442,7 +439,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { public Properties getSecureProperties() { Properties prop = new Properties(); - if (secureMode) { + if (config.isSecureMode()) { prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.kerberos.service.name", "kafka"); http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index d5c9276..3163f52 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -98,7 +98,7 @@ public class KafkaShortRetentionTestBase implements Serializable { specificProperties.setProperty("log.retention.minutes", "0"); specificProperties.setProperty("log.retention.ms", "250"); specificProperties.setProperty("log.retention.check.interval.ms", "100"); - kafkaServer.prepare(1, specificProperties, false); + kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties)); standardProps = kafkaServer.getStandardProperties(); http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index c484a4b..8eb0351 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -135,7 +135,7 @@ public abstract class KafkaTestBase extends TestLogger { LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); - kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode); + kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode)); standardProps = kafkaServer.getStandardProperties(); http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 50eff23..ea292a9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -38,15 +38,56 @@ import java.util.Properties; * Abstract class providing a Kafka test environment. */ public abstract class KafkaTestEnvironment { + /** + * Configuration class for {@link KafkaTestEnvironment}. + */ + public static class Config { + private int kafkaServersNumber = 1; + private Properties kafkaServerProperties = null; + private boolean secureMode = false; + + /** + * Please use {@link KafkaTestEnvironment#createConfig()} method. + */ + private Config() { + } + + public int getKafkaServersNumber() { + return kafkaServersNumber; + } + + public Config setKafkaServersNumber(int kafkaServersNumber) { + this.kafkaServersNumber = kafkaServersNumber; + return this; + } + + public Properties getKafkaServerProperties() { + return kafkaServerProperties; + } + + public Config setKafkaServerProperties(Properties kafkaServerProperties) { + this.kafkaServerProperties = kafkaServerProperties; + return this; + } + + public boolean isSecureMode() { + return secureMode; + } + + public Config setSecureMode(boolean secureMode) { + this.secureMode = secureMode; + return this; + } + } protected static final String KAFKA_HOST = "localhost"; - public abstract void prepare(int numKafkaServers, Properties kafkaServerProperties, boolean secureMode); - - public void prepare(int numberOfKafkaServers, boolean secureMode) { - this.prepare(numberOfKafkaServers, null, secureMode); + public static Config createConfig() { + return new Config(); } + public abstract void prepare(Config config); + public abstract void shutdown(); public abstract void deleteTestTopic(String topic);