This is an automated email from the ASF dual-hosted git repository. mmiklavcic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new 67fa5a4 METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493 67fa5a4 is described below commit 67fa5a403b01d0f7c8607c06e63f9d06f8b8cbc1 Author: tigerquoll <tigerqu...@outlook.com> AuthorDate: Wed Sep 4 11:47:04 2019 -0600 METRON-2227 Increase Kafka test harness timeout (tigerquoll via mmiklavc) closes apache/metron#1493 --- .../integration/components/KafkaComponent.java | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java index 08910be..0fa414b 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java @@ -65,6 +65,10 @@ import org.slf4j.LoggerFactory; public class KafkaComponent implements InMemoryComponent { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final long KAFKA_PROPAGATE_TIMEOUT_MS = 10000l; + public static final int ZK_SESSION_TIMEOUT_MS = 30000; + public static final int ZK_CONNECTION_TIMEOUT_MS = 30000; + public static final int KAFKA_ZOOKEEPER_TIMEOUT_MS = 1000000; public static class Topic { public int numPartitions; @@ -159,11 +163,11 @@ public class KafkaComponent implements InMemoryComponent { // setup Zookeeper zookeeperConnectString = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY); - zkClient = new ZkClient(zookeeperConnectString, 30000, 30000, ZKStringSerializer$.MODULE$); + zkClient = new ZkClient(zookeeperConnectString, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); // setup Broker Properties props = TestUtilsWrapper.createBrokerConfig(0, zookeeperConnectString, brokerPort); - props.setProperty("zookeeper.connection.timeout.ms","1000000"); + props.setProperty("zookeeper.connection.timeout.ms", Integer.toString(KAFKA_ZOOKEEPER_TIMEOUT_MS)); KafkaConfig config = new KafkaConfig(props); Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); @@ -175,7 +179,7 @@ public class KafkaComponent implements InMemoryComponent { for(Topic topic : getTopics()) { try { - createTopic(topic.name, topic.numPartitions, true); + createTopic(topic.name, topic.numPartitions, KAFKA_PROPAGATE_TIMEOUT_MS); } catch (InterruptedException e) { throw new RuntimeException("Unable to create topic", e); } @@ -288,26 +292,26 @@ public class KafkaComponent implements InMemoryComponent { } public void createTopic(String name) throws InterruptedException { - createTopic(name, 1, true); + createTopic(name, 1, KAFKA_PROPAGATE_TIMEOUT_MS); } - public void waitUntilMetadataIsPropagated(String topic, int numPartitions) { + public void waitUntilMetadataIsPropagated(String topic, int numPartitions, long timeOutMS) { List<KafkaServer> servers = new ArrayList<>(); servers.add(kafkaServer); for(int part = 0;part < numPartitions;++part) { - TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, 5000); + TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, part, timeOutMS); } } - public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException { + public void createTopic(String name, int numPartitions, long waitThisLongForMetadataToPropagate) throws InterruptedException { ZkUtils zkUtils = null; Level oldLevel = UnitTestHelper.getJavaLoggingLevel(); try { UnitTestHelper.setJavaLoggingLevel(Level.OFF); zkUtils = ZkUtils.apply(zookeeperConnectString, 30000, 30000, false); AdminUtilsWrapper.createTopic(zkUtils, name, numPartitions, 1, new Properties()); - if (waitUntilMetadataIsPropagated) { - waitUntilMetadataIsPropagated(name, numPartitions); + if (waitThisLongForMetadataToPropagate > 0) { + waitUntilMetadataIsPropagated(name, numPartitions, waitThisLongForMetadataToPropagate); } }catch(TopicExistsException tee) { }finally {