SAMZA-600: Move Java tests into src/test/java in samza-kafka
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/660d879e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/660d879e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/660d879e Branch: refs/heads/samza-sql Commit: 660d879e7f1218a7289c97d654e8371fd7da6392 Parents: 1202df4 Author: Benjamin Fradet <benjamin dot fradet at gmail dot com> Authored: Sat Mar 14 14:54:06 2015 -0700 Committer: Jakob Homan <[email protected]> Committed: Sat Mar 14 14:54:06 2015 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/MockKafkaProducer.java | 246 +++++++++++++++++++ .../java/org/apache/samza/utils/TestUtils.java | 112 +++++++++ .../samza/system/kafka/MockKafkaProducer.java | 246 ------------------- .../scala/org/apache/samza/utils/TestUtils.java | 112 --------- 4 files changed, 358 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java new file mode 100644 index 0000000..6f498de --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.utils.TestUtils; +import org.apache.kafka.common.MetricName; + + +public class MockKafkaProducer implements Producer<byte[], byte[]> { + + private Cluster _cluster; + private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>(); + private boolean shouldBuffer = false; + private boolean errorNext = false; + private Exception exception = null; + private AtomicInteger msgsSent = new AtomicInteger(0); + + /* + * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to: + * - Create send that will instantly succeed & return a successful future + * - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a + * future with exception. + * Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be + * manually set to true in the unit test, before expecting failure for another message. + * - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it + * at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself + * using "startDelayedSendThread" method + * - "Offset" in RecordMetadata is not guranteed to be correct + */ + public MockKafkaProducer(int numNodes, String topicName, int numPartitions) { + this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions); + } + + public void setShouldBuffer(boolean shouldBuffer) { + this.shouldBuffer = shouldBuffer; + } + + public void setErrorNext(boolean errorNext, Exception exception) { + this.errorNext = errorNext; + this.exception = exception; + } + + public int getMsgsSent() { + return this.msgsSent.get(); + } + + public Thread startDelayedSendThread(final int sleepTime) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + FutureTask[] callbackArray = new FutureTask[_callbacksList.size()]; + AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray)); + ExecutorService executor = Executors.newFixedThreadPool(10); + try { + for(int i = 0; i < _bufferList.length(); i++) { + Thread.sleep(sleepTime); + FutureTask f = _bufferList.get(i); + if(!f.isDone()) { + executor.submit(f).get(); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException ee) { + ee.printStackTrace(); + } + } + }); + t.start(); + return t; + } + + @Override + public Future<RecordMetadata> send(ProducerRecord record) { + return send(record, null); + } + + private RecordMetadata getRecordMetadata(ProducerRecord record) { + return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get()); + } + + @Override + public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { + if (errorNext) { + if (shouldBuffer) { + FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() { + @Override + public RecordMetadata call() + throws Exception { + callback.onCompletion(null, exception); + return getRecordMetadata(record); + } + }); + _callbacksList.add(f); + this.errorNext = false; + return f; + } else { + callback.onCompletion(null, this.exception); + this.errorNext = false; + return new FutureFailure(this.exception); + } + } else { + if (shouldBuffer) { + FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() { + @Override + public RecordMetadata call() + throws Exception { + msgsSent.incrementAndGet(); + RecordMetadata metadata = getRecordMetadata(record); + callback.onCompletion(metadata, null); + return metadata; + } + }); + _callbacksList.add(f); + return f; + } else { + int offset = msgsSent.incrementAndGet(); + final RecordMetadata metadata = getRecordMetadata(record); + callback.onCompletion(metadata, null); + return new FutureSuccess(record, offset); + } + } + } + + @Override + public List<PartitionInfo> partitionsFor(String topic) { + return this._cluster.partitionsForTopic(topic); + } + + @Override + public Map<MetricName, Metric> metrics() { + return null; + } + + @Override + public void close() { + + } + + private static class FutureFailure implements Future<RecordMetadata> { + + private final ExecutionException exception; + + public FutureFailure(Exception exception) { + this.exception = new ExecutionException(exception); + } + + @Override + public boolean cancel(boolean interrupt) { + return false; + } + + @Override + public RecordMetadata get() throws ExecutionException { + throw this.exception; + } + + @Override + public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { + throw this.exception; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + } + + private static class FutureSuccess implements Future<RecordMetadata> { + + private ProducerRecord record; + private final RecordMetadata _metadata; + + public FutureSuccess(ProducerRecord record, int offset) { + this.record = record; + this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset); + } + + @Override + public boolean cancel(boolean interrupt) { + return false; + } + + @Override + public RecordMetadata get() throws ExecutionException { + return this._metadata; + } + + @Override + public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { + return this._metadata; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java new file mode 100644 index 0000000..2fa743f --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samza.utils; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; + +import static java.util.Arrays.asList; + + +/** + * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved + * Helper functions for writing unit tests + */ +public class TestUtils { + + public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); + + public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + public static String DIGITS = "0123456789"; + public static String LETTERS_AND_DIGITS = LETTERS + DIGITS; + + /* A consistent random number generator to make tests repeatable */ + public static final Random seededRandom = new Random(192348092834L); + public static final Random random = new Random(); + + public static Cluster singletonCluster(String topic, int partitions) { + return clusterWith(1, topic, partitions); + } + + public static Cluster clusterWith(int nodes, String topic, int partitions) { + Node[] ns = new Node[nodes]; + for (int i = 0; i < nodes; i++) + ns[i] = new Node(0, "localhost", 1969); + List<PartitionInfo> parts = new ArrayList<PartitionInfo>(); + for (int i = 0; i < partitions; i++) + parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); + return new Cluster(asList(ns), parts); + } + + /** + * Choose a number of random available ports + */ + public static int[] choosePorts(int count) { + try { + ServerSocket[] sockets = new ServerSocket[count]; + int[] ports = new int[count]; + for (int i = 0; i < count; i++) { + sockets[i] = new ServerSocket(0); + ports[i] = sockets[i].getLocalPort(); + } + for (int i = 0; i < count; i++) + sockets[i].close(); + return ports; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Choose an available port + */ + public static int choosePort() { + return choosePorts(1)[0]; + } + + /** + * Generate an array of random bytes + * + * @param size The size of the array + */ + public static byte[] randomBytes(int size) { + byte[] bytes = new byte[size]; + seededRandom.nextBytes(bytes); + return bytes; + } + + /** + * Generate a random string of letters and digits of the given length + * + * @param len The length of the string + * @return The random string + */ + public static String randomString(int len) { + StringBuilder b = new StringBuilder(); + for (int i = 0; i < len; i++) + b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); + return b.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java deleted file mode 100644 index 6f498de..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceArray; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.samza.utils.TestUtils; -import org.apache.kafka.common.MetricName; - - -public class MockKafkaProducer implements Producer<byte[], byte[]> { - - private Cluster _cluster; - private List<FutureTask<RecordMetadata>> _callbacksList = new ArrayList<FutureTask<RecordMetadata>>(); - private boolean shouldBuffer = false; - private boolean errorNext = false; - private Exception exception = null; - private AtomicInteger msgsSent = new AtomicInteger(0); - - /* - * Helps mock out buffered behavior seen in KafkaProducer. This MockKafkaProducer enables you to: - * - Create send that will instantly succeed & return a successful future - * - Set error for the next message that is sent (using errorNext). In this case, the next call to send returns a - * future with exception. - * Please note that errorNext is reset to false once a message send has failed. This means that errorNext has to be - * manually set to true in the unit test, before expecting failure for another message. - * - "shouldBuffer" can be turned on to start buffering messages. This will store all the callbacks and execute it - * at a later point of time in a separate thread. This thread NEEDS to be triggered from the unit test itself - * using "startDelayedSendThread" method - * - "Offset" in RecordMetadata is not guranteed to be correct - */ - public MockKafkaProducer(int numNodes, String topicName, int numPartitions) { - this._cluster = TestUtils.clusterWith(numNodes, topicName, numPartitions); - } - - public void setShouldBuffer(boolean shouldBuffer) { - this.shouldBuffer = shouldBuffer; - } - - public void setErrorNext(boolean errorNext, Exception exception) { - this.errorNext = errorNext; - this.exception = exception; - } - - public int getMsgsSent() { - return this.msgsSent.get(); - } - - public Thread startDelayedSendThread(final int sleepTime) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - FutureTask[] callbackArray = new FutureTask[_callbacksList.size()]; - AtomicReferenceArray<FutureTask> _bufferList = new AtomicReferenceArray<FutureTask>(_callbacksList.toArray(callbackArray)); - ExecutorService executor = Executors.newFixedThreadPool(10); - try { - for(int i = 0; i < _bufferList.length(); i++) { - Thread.sleep(sleepTime); - FutureTask f = _bufferList.get(i); - if(!f.isDone()) { - executor.submit(f).get(); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException ee) { - ee.printStackTrace(); - } - } - }); - t.start(); - return t; - } - - @Override - public Future<RecordMetadata> send(ProducerRecord record) { - return send(record, null); - } - - private RecordMetadata getRecordMetadata(ProducerRecord record) { - return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get()); - } - - @Override - public Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) { - if (errorNext) { - if (shouldBuffer) { - FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() { - @Override - public RecordMetadata call() - throws Exception { - callback.onCompletion(null, exception); - return getRecordMetadata(record); - } - }); - _callbacksList.add(f); - this.errorNext = false; - return f; - } else { - callback.onCompletion(null, this.exception); - this.errorNext = false; - return new FutureFailure(this.exception); - } - } else { - if (shouldBuffer) { - FutureTask<RecordMetadata> f = new FutureTask<RecordMetadata>(new Callable<RecordMetadata>() { - @Override - public RecordMetadata call() - throws Exception { - msgsSent.incrementAndGet(); - RecordMetadata metadata = getRecordMetadata(record); - callback.onCompletion(metadata, null); - return metadata; - } - }); - _callbacksList.add(f); - return f; - } else { - int offset = msgsSent.incrementAndGet(); - final RecordMetadata metadata = getRecordMetadata(record); - callback.onCompletion(metadata, null); - return new FutureSuccess(record, offset); - } - } - } - - @Override - public List<PartitionInfo> partitionsFor(String topic) { - return this._cluster.partitionsForTopic(topic); - } - - @Override - public Map<MetricName, Metric> metrics() { - return null; - } - - @Override - public void close() { - - } - - private static class FutureFailure implements Future<RecordMetadata> { - - private final ExecutionException exception; - - public FutureFailure(Exception exception) { - this.exception = new ExecutionException(exception); - } - - @Override - public boolean cancel(boolean interrupt) { - return false; - } - - @Override - public RecordMetadata get() throws ExecutionException { - throw this.exception; - } - - @Override - public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { - throw this.exception; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - } - - private static class FutureSuccess implements Future<RecordMetadata> { - - private ProducerRecord record; - private final RecordMetadata _metadata; - - public FutureSuccess(ProducerRecord record, int offset) { - this.record = record; - this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset); - } - - @Override - public boolean cancel(boolean interrupt) { - return false; - } - - @Override - public RecordMetadata get() throws ExecutionException { - return this._metadata; - } - - @Override - public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { - return this._metadata; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/660d879e/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java deleted file mode 100644 index 2fa743f..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samza.utils; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; - -import static java.util.Arrays.asList; - - -/** - * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved - * Helper functions for writing unit tests - */ -public class TestUtils { - - public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); - - public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - public static String DIGITS = "0123456789"; - public static String LETTERS_AND_DIGITS = LETTERS + DIGITS; - - /* A consistent random number generator to make tests repeatable */ - public static final Random seededRandom = new Random(192348092834L); - public static final Random random = new Random(); - - public static Cluster singletonCluster(String topic, int partitions) { - return clusterWith(1, topic, partitions); - } - - public static Cluster clusterWith(int nodes, String topic, int partitions) { - Node[] ns = new Node[nodes]; - for (int i = 0; i < nodes; i++) - ns[i] = new Node(0, "localhost", 1969); - List<PartitionInfo> parts = new ArrayList<PartitionInfo>(); - for (int i = 0; i < partitions; i++) - parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); - return new Cluster(asList(ns), parts); - } - - /** - * Choose a number of random available ports - */ - public static int[] choosePorts(int count) { - try { - ServerSocket[] sockets = new ServerSocket[count]; - int[] ports = new int[count]; - for (int i = 0; i < count; i++) { - sockets[i] = new ServerSocket(0); - ports[i] = sockets[i].getLocalPort(); - } - for (int i = 0; i < count; i++) - sockets[i].close(); - return ports; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Choose an available port - */ - public static int choosePort() { - return choosePorts(1)[0]; - } - - /** - * Generate an array of random bytes - * - * @param size The size of the array - */ - public static byte[] randomBytes(int size) { - byte[] bytes = new byte[size]; - seededRandom.nextBytes(bytes); - return bytes; - } - - /** - * Generate a random string of letters and digits of the given length - * - * @param len The length of the string - * @return The random string - */ - public static String randomString(int len) { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < len; i++) - b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); - return b.toString(); - } - -}
