Repository: kafka Updated Branches: refs/heads/trunk 9bc47bc13 -> 23d7fc470
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 12c9500..a2b7722 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -70,6 +70,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + metrics, time); @Before @@ -114,6 +115,7 @@ public class SenderTest { REQUEST_TIMEOUT_MS, SEND_BUFFER_SIZE, RECEIVE_BUFFER_SIZE, + new Metrics(), time); Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); RequestSend request1 = completeSend(sender); http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index fdd8914..9ff73f4 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.metrics; @@ -22,25 +18,16 @@ import static org.junit.Assert.fail; import java.util.Arrays; import java.util.concurrent.TimeUnit; - import org.apache.kafka.common.Metric; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Quota; -import org.apache.kafka.common.metrics.QuotaViolationException; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Min; import org.apache.kafka.common.metrics.stats.Percentile; import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Total; -import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; import org.apache.kafka.common.utils.MockTime; import org.junit.Test; @@ -154,9 +141,10 @@ public class MetricsTest { public void testOldDataHasNoEffect() { Max max = new Max(); long windowMs = 100; - MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS); + int samples = 2; + MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); max.record(config, 50, time.nanoseconds()); - time.sleep(windowMs); + time.sleep(samples * windowMs); assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS); } http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 90e2dcf..99856e9 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -27,16 +23,12 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; -import java.nio.channels.UnresolvedAddressException; import java.util.ArrayList; import java.util.Collections; import java.util.List; - -import org.apache.kafka.common.network.NetworkReceive; -import org.apache.kafka.common.network.NetworkSend; -import org.apache.kafka.common.network.Selectable; -import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -58,7 +50,7 @@ public class SelectorTest { public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(); + this.selector = new Selector(new Metrics(), new MockTime()); } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/MetricsBench.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java index 7239b4a..9d98c11 100644 --- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java +++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.test; @@ -27,7 +23,6 @@ import org.apache.kafka.common.metrics.stats.Percentile; import org.apache.kafka.common.metrics.stats.Percentiles; import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; - public class MetricsBench { public static void main(String[] args) { @@ -48,7 +43,7 @@ public class MetricsBench { } long start = System.nanoTime(); for (int i = 0; i < iters; i++) - child.record(i); + parent.record(i); double ellapsed = (System.nanoTime() - start) / (double) iters; System.out.println(String.format("%.2f ns per metric recording.", ellapsed)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java index 46cf86e..6aab854 100644 --- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.test; @@ -24,11 +20,11 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.SystemTime; - public class Microbenchmarks { public static void main(String[] args) throws Exception { @@ -112,6 +108,43 @@ public class Microbenchmarks { t1.join(); t2.join(); + System.out.println("Testing locks"); + done.set(false); + final ReentrantLock lock2 = new ReentrantLock(); + Thread t3 = new Thread() { + public void run() { + time.sleep(1); + int counter = 0; + long start = time.nanoseconds(); + for (int i = 0; i < iters; i++) { + lock2.lock(); + counter++; + lock2.unlock(); + } + System.out.println("lock: " + ((System.nanoTime() - start) / iters)); + System.out.println(counter); + done.set(true); + } + }; + + Thread t4 = new Thread() { + public void run() { + int counter = 0; + while (!done.get()) { + time.sleep(1); + lock2.lock(); + counter++; + lock2.unlock(); + } + System.out.println("Counter: " + counter); + } + }; + + t3.start(); + t4.start(); + t3.join(); + t4.join(); + Map<String, Integer> values = new HashMap<String, Integer>(); for (int i = 0; i < 100; i++) values.put(Integer.toString(i), i); http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/config/log4j.properties ---------------------------------------------------------------------- diff --git a/config/log4j.properties b/config/log4j.properties index baa698b..9502254 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH -log4j.appender.cleanerAppender.File=log-cleaner.log +log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n