Repository: kafka Updated Branches: refs/heads/trunk b6765e46c -> e4208b1d5
MINOR: update producer client request timeout in system test Author: Bill Bejeck <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #4168 from bbejeck/MINOR_update_streams_produer_timeout_in_system_test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4208b1d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4208b1d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4208b1d Branch: refs/heads/trunk Commit: e4208b1d5fa1c28ac7e64e2cb039404a14084dc0 Parents: b6765e4 Author: Bill Bejeck <[email protected]> Authored: Thu Nov 2 17:53:15 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 2 17:53:15 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/streams/perf/SimpleBenchmark.java | 2 ++ .../test/java/org/apache/kafka/streams/perf/YahooBenchmark.java | 5 +++++ .../org/apache/kafka/streams/tests/BrokerCompatibilityTest.java | 2 ++ .../test/java/org/apache/kafka/streams/tests/EosTestClient.java | 3 +++ .../java/org/apache/kafka/streams/tests/SmokeTestClient.java | 4 ++-- 5 files changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index d1e3206..592c0e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -232,6 +232,8 @@ public class SimpleBenchmark { props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); + //TODO remove this config or set to smaller value when KIP-91 is merged + props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000); return props; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java index d98fd7f..9490101 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -15,7 +15,9 @@ * limitations under the License. */ package org.apache.kafka.streams.perf; + import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -28,6 +30,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -195,6 +198,8 @@ public class YahooBenchmark { CountDownLatch latch = new CountDownLatch(1); Properties props = parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt()); + //TODO remove this config or set to smaller value when KIP-91 is merged + props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000); final KafkaStreams streams = createYahooBenchmarkStreams(props, campaignsTopic, eventsTopic, latch, parent.numRecords); parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch); http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 3513895..ca7620d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -68,6 +68,8 @@ public class BrokerCompatibilityTest { streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout); streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout); streamsProperties.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout + 1); + //TODO remove this config or set to smaller value when KIP-91 is merged + streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000); final StreamsBuilder builder = new StreamsBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 1c11be4..2bf90d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.tests; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; @@ -125,6 +126,8 @@ public class EosTestClient extends SmokeTestUtil { props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + //TODO remove this config or set to smaller value when KIP-91 is merged + props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000); final StreamsBuilder builder = new StreamsBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e4208b1d/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 4b75702..b4ed127 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -104,6 +104,8 @@ public class SmokeTestClient extends SmokeTestUtil { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.ACKS_CONFIG, "all"); + //TODO remove this config or set to smaller value when KIP-91 is merged + props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), 60000); StreamsBuilder builder = new StreamsBuilder(); Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde); @@ -115,7 +117,6 @@ public class SmokeTestClient extends SmokeTestUtil { return value == null || value != END; } }); - data.process(SmokeTestUtil.printProcessorSupplier("data")); // min @@ -189,7 +190,6 @@ public class SmokeTestClient extends SmokeTestUtil { Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde); KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed); sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); - // cnt groupedData.count(TimeWindows.of(TimeUnit.DAYS.toMillis(2)), "uwin-cnt") .toStream().map(
