Updated Branches: refs/heads/trunk 3220af1fe -> fa6339c19
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java new file mode 100644 index 0000000..64d172d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java @@ -0,0 +1,144 @@ +package org.apache.kafka.test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.kafka.common.utils.SystemTime; + + +public class Microbenchmarks { + + public static void main(String[] args) throws Exception { + + final int iters = Integer.parseInt(args[0]); + double x = 0.0; + long start = System.nanoTime(); + for (int i = 0; i < iters; i++) + x += Math.sqrt(x); + System.out.println(x); + System.out.println("sqrt: " + (System.nanoTime() - start) / (double) iters); + + // test clocks + systemMillis(iters); + systemNanos(iters); + long total = 0; + start = System.nanoTime(); + total += systemMillis(iters); + System.out.println("System.currentTimeMillis(): " + (System.nanoTime() - start) / iters); + start = System.nanoTime(); + total += systemNanos(iters); + System.out.println("System.nanoTime(): " + (System.nanoTime() - start) / iters); + System.out.println(total); + + // test random + int n = 0; + Random random = new Random(); + start = System.nanoTime(); + for (int i = 0; i < iters; i++) { + n += random.nextInt(); + } + System.out.println(n); + System.out.println("random: " + (System.nanoTime() - start) / iters); + + float[] floats = new float[1024]; + for (int i = 0; i < floats.length; i++) + floats[i] = random.nextFloat(); + Arrays.sort(floats); + + int loc = 0; + start = System.nanoTime(); + for (int i = 0; i < iters; i++) + loc += Arrays.binarySearch(floats, floats[i % floats.length]); + System.out.println(loc); + System.out.println("binary search: " + (System.nanoTime() - start) / iters); + + final SystemTime time = new SystemTime(); + final AtomicBoolean done = new AtomicBoolean(false); + final Object lock = new Object(); + Thread t1 = new Thread() { + public void run() { + time.sleep(1); + int counter = 0; + long start = time.nanoseconds(); + for (int i = 0; i < iters; i++) { + synchronized (lock) { + counter++; + } + } + System.out.println("synchronized: " + ((System.nanoTime() - start) / iters)); + System.out.println(counter); + done.set(true); + } + }; + + Thread t2 = new Thread() { + public void run() { + int counter = 0; + while (!done.get()) { + time.sleep(1); + synchronized (lock) { + counter += 1; + } + } + System.out.println("Counter: " + counter); + } + }; + + t1.start(); + t2.start(); + t1.join(); + t2.join(); + + Map<String, Integer> values = new HashMap<String, Integer>(); + for (int i = 0; i < 100; i++) + values.put(Integer.toString(i), i); + System.out.println("HashMap:"); + benchMap(2, 1000000, values); + System.out.println("ConcurentHashMap:"); + benchMap(2, 1000000, new ConcurrentHashMap<String, Integer>(values)); + System.out.println("CopyOnWriteMap:"); + benchMap(2, 1000000, new CopyOnWriteMap<String, Integer>(values)); + } + + private static void benchMap(int numThreads, final int iters, final Map<String, Integer> map) throws Exception { + final List<String> keys = new ArrayList<String>(map.keySet()); + final List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < numThreads; i++) { + threads.add(new Thread() { + public void run() { + int sum = 0; + long start = System.nanoTime(); + for (int j = 0; j < iters; j++) + map.get(keys.get(j % threads.size())); + System.out.println("Map access time: " + ((System.nanoTime() - start) / (double) iters)); + } + }); + } + for (Thread thread : threads) + thread.start(); + for (Thread thread : threads) + thread.join(); + } + + private static long systemMillis(int iters) { + long total = 0; + for (int i = 0; i < iters; i++) + total += System.currentTimeMillis(); + return total; + } + + private static long systemNanos(int iters) { + long total = 0; + for (int i = 0; i < iters; i++) + total += System.currentTimeMillis(); + return total; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java new file mode 100644 index 0000000..0c69c5f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -0,0 +1,88 @@ +package org.apache.kafka.test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.utils.Time; + + +/** + * A fake selector to use for testing + */ +public class MockSelector implements Selectable { + + private final Time time; + private final List<NetworkSend> completedSends = new ArrayList<NetworkSend>(); + private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>(); + private final List<Integer> disconnected = new ArrayList<Integer>(); + private final List<Integer> connected = new ArrayList<Integer>(); + + public MockSelector(Time time) { + this.time = time; + } + + @Override + public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + this.connected.add(id); + } + + @Override + public void disconnect(int id) { + this.disconnected.add(id); + } + + @Override + public void wakeup() { + } + + @Override + public void close() { + } + + public void clear() { + this.completedSends.clear(); + this.completedReceives.clear(); + this.disconnected.clear(); + this.connected.clear(); + } + + @Override + public void poll(long timeout, List<NetworkSend> sends) throws IOException { + this.completedSends.addAll(sends); + time.sleep(timeout); + } + + @Override + public List<NetworkSend> completedSends() { + return completedSends; + } + + public void completeSend(NetworkSend send) { + this.completedSends.add(send); + } + + @Override + public List<NetworkReceive> completedReceives() { + return completedReceives; + } + + public void completeReceive(NetworkReceive receive) { + this.completedReceives.add(receive); + } + + @Override + public List<Integer> disconnected() { + return disconnected; + } + + @Override + public List<Integer> connected() { + return connected; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java new file mode 100644 index 0000000..541bc59 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -0,0 +1,96 @@ +package org.apache.kafka.test; + +import static java.util.Arrays.asList; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; + + +/** + * Helper functions for writing unit tests + */ +public class TestUtils { + + public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); + + public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + public static String DIGITS = "0123456789"; + public static String LETTERS_AND_DIGITS = LETTERS + DIGITS; + + /* A consistent random number generator to make tests repeatable */ + public static final Random seededRandom = new Random(192348092834L); + public static final Random random = new Random(); + + public static Cluster singletonCluster(String topic, int partitions) { + return clusterWith(1, topic, partitions); + } + + public static Cluster clusterWith(int nodes, String topic, int partitions) { + Node[] ns = new Node[nodes]; + for (int i = 0; i < nodes; i++) + ns[i] = new Node(0, "localhost", 1969); + List<PartitionInfo> parts = new ArrayList<PartitionInfo>(); + for (int i = 0; i < partitions; i++) + parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); + return new Cluster(asList(ns), parts); + } + + /** + * Choose a number of random available ports + */ + public static int[] choosePorts(int count) { + try { + ServerSocket[] sockets = new ServerSocket[count]; + int[] ports = new int[count]; + for (int i = 0; i < count; i++) { + sockets[i] = new ServerSocket(0); + ports[i] = sockets[i].getLocalPort(); + } + for (int i = 0; i < count; i++) + sockets[i].close(); + return ports; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Choose an available port + */ + public static int choosePort() { + return choosePorts(1)[0]; + } + + /** + * Generate an array of random bytes + * + * @param numBytes The size of the array + */ + public static byte[] randomBytes(int size) { + byte[] bytes = new byte[size]; + seededRandom.nextBytes(bytes); + return bytes; + } + + /** + * Generate a random string of letters and digits of the given length + * + * @param len The length of the string + * @return The random string + */ + public static String randomString(int len) { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < len; i++) + b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); + return b.toString(); + } + +}
