Repository: kafka Updated Branches: refs/heads/trunk 72eec0a04 -> a5cd34d79
http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index a0680a2..4a0d46e 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -20,7 +20,6 @@ package kafka.producer import java.net.SocketTimeoutException import java.util.Properties -import kafka.admin.AdminUtils import kafka.api.{ProducerRequest, ProducerResponseStatus} import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala index 9ff47dd..25b2fed 100755 --- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.{Base64, Utils} import org.slf4j.event.Level import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} @@ -46,6 +47,56 @@ class CoreUtilsTest extends JUnitSuite with Logging { } @Test + def testTryAll(): Unit = { + case class TestException(key: String) extends Exception + + val recorded = mutable.Map.empty[String, Either[TestException, String]] + def recordingFunction(v: Either[TestException, String]): Unit = { + val key = v match { + case Right(key) => key + case Left(e) => e.key + } + recorded(key) = v + } + + CoreUtils.tryAll(Seq( + () => recordingFunction(Right("valid-0")), + () => recordingFunction(Left(new TestException("exception-1"))), + () => recordingFunction(Right("valid-2")), + () => recordingFunction(Left(new TestException("exception-3"))) + )) + var expected = Map( + "valid-0" -> Right("valid-0"), + "exception-1" -> Left(TestException("exception-1")), + "valid-2" -> Right("valid-2"), + "exception-3" -> Left(TestException("exception-3")) + ) + assertEquals(expected, recorded) + + recorded.clear() + CoreUtils.tryAll(Seq( + () => recordingFunction(Right("valid-0")), + () => recordingFunction(Right("valid-1")) + )) + expected = Map( + "valid-0" -> Right("valid-0"), + "valid-1" -> Right("valid-1") + ) + assertEquals(expected, recorded) + + recorded.clear() + CoreUtils.tryAll(Seq( + () => recordingFunction(Left(new TestException("exception-0"))), + () => recordingFunction(Left(new TestException("exception-1"))) + )) + expected = Map( + "exception-0" -> Left(TestException("exception-0")), + "exception-1" -> Left(TestException("exception-1")) + ) + assertEquals(expected, recorded) + } + + @Test def testCircularIterator() { val l = List(1, 2) val itl = CoreUtils.circularIterator(l) http://git-wip-us.apache.org/repos/asf/kafka/blob/a5cd34d7/core/src/test/scala/unit/kafka/utils/MockScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index c5f383c..5ebdf40 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.Time class MockScheduler(val time: Time) extends Scheduler { /* a priority queue of tasks ordered by next execution time */ - var tasks = new PriorityQueue[MockTask]() + private val tasks = new PriorityQueue[MockTask]() def isStarted = true @@ -77,6 +77,12 @@ class MockScheduler(val time: Time) extends Scheduler { tick() } } + + def clear(): Unit = { + this synchronized { + tasks.clear() + } + } }