SAMZA-412; replace assert calls in tests with appropriate JUnit assert methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/811f2897 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/811f2897 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/811f2897 Branch: refs/heads/master Commit: 811f2897c640c684f4506a6bab7590304de34fca Parents: cb40a59 Author: David Chen <[email protected]> Authored: Tue Sep 16 09:29:20 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Sep 16 09:29:20 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/TestConfig.java | 45 ++-- .../metrics/TestSlidingTimeWindowReservoir.java | 20 +- .../org/apache/samza/metrics/TestTimer.java | 14 +- .../samza/util/TestBlockingEnvelopeMap.java | 2 + .../samza/util/TestNoOpMetricsRegistry.java | 26 ++- ...inglePartitionWithoutOffsetsSystemAdmin.java | 10 +- .../samza/checkpoint/TestCheckpointTool.scala | 3 +- .../samza/checkpoint/TestOffsetManager.scala | 11 +- .../factories/TestPropertiesConfigFactory.scala | 4 +- .../samza/container/TestSamzaContainer.scala | 14 +- .../samza/container/TestTaskInstance.scala | 29 +-- .../TestTaskNamesToSystemStreamPartitions.scala | 6 +- .../grouper/stream/GroupByTestBase.scala | 13 +- .../grouper/stream/TestGroupByPartition.scala | 10 +- .../TestGroupBySystemStreamPartition.scala | 15 +- .../org/apache/samza/job/TestJobRunner.scala | 5 +- .../samza/job/TestShellCommandBuilder.scala | 2 +- .../apache/samza/job/local/TestProcessJob.scala | 3 +- .../apache/samza/job/local/TestThreadJob.scala | 3 +- .../apache/samza/metrics/TestJmxServer.scala | 17 +- .../metrics/reporter/TestJmxReporter.scala | 19 +- .../samza/serializers/TestByteSerde.scala | 11 +- .../samza/serializers/TestCheckpointSerde.scala | 2 + .../samza/serializers/TestIntegerSerde.scala | 7 +- .../samza/serializers/TestStringSerde.scala | 5 +- .../samza/system/TestSystemConsumers.scala | 19 +- .../system/chooser/TestBatchingChooser.scala | 22 +- .../chooser/TestBootstrappingChooser.scala | 70 +++--- .../system/chooser/TestDefaultChooser.scala | 23 +- .../system/chooser/TestRoundRobinChooser.scala | 19 +- .../chooser/TestTieredPriorityChooser.scala | 56 ++--- .../filereader/TestFileReaderSystemAdmin.scala | 16 +- .../TestFileReaderSystemConsumer.scala | 2 + .../TestFileReaderSystemFactory.scala | 7 +- .../samza/task/TestReadableCoordinator.scala | 4 +- .../samza/util/TestDaemonThreadFactory.scala | 7 +- .../util/TestExponentialSleepStrategy.scala | 4 +- .../scala/org/apache/samza/util/TestUtil.scala | 2 + .../kafka/TestKafkaCheckpointLogKey.scala | 2 +- .../kafka/TestKafkaCheckpointManager.scala | 9 +- .../apache/samza/config/TestKafkaConfig.scala | 11 +- .../samza/config/TestKafkaSerdeConfig.scala | 8 +- .../samza/config/TestRegExTopicGenerator.scala | 6 +- .../samza/serializers/TestKafkaSerde.scala | 8 +- .../samza/system/kafka/TestBrokerProxy.scala | 26 +-- .../samza/system/kafka/TestGetOffset.scala | 16 +- .../system/kafka/TestKafkaSystemAdmin.scala | 37 ++-- .../system/kafka/TestKafkaSystemConsumer.scala | 15 +- .../system/kafka/TestKafkaSystemFactory.scala | 11 +- .../system/kafka/TestKafkaSystemProducer.scala | 24 ++- .../system/kafka/TestTopicMetadataCache.scala | 14 +- .../samza/storage/kv/TestKeyValueStores.scala | 16 +- .../samza/logging/log4j/TestJmxAppender.java | 5 +- .../samza/serializers/TestJsonSerde.scala | 4 +- .../serializers/TestMetricsSnapshotSerde.scala | 14 +- .../test/integration/TestStatefulTask.scala | 11 +- .../TestSamzaContainerPerformance.scala | 17 +- .../samza/job/yarn/TestSamzaAppMaster.scala | 25 +-- .../job/yarn/TestSamzaAppMasterLifecycle.scala | 16 +- .../job/yarn/TestSamzaAppMasterService.scala | 18 +- .../yarn/TestSamzaAppMasterTaskManager.scala | 214 +++++++++---------- 61 files changed, 587 insertions(+), 487 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java index e701296..b4100c2 100644 --- a/samza-api/src/test/java/org/apache/samza/config/TestConfig.java +++ b/samza-api/src/test/java/org/apache/samza/config/TestConfig.java @@ -19,13 +19,18 @@ package org.apache.samza.config; -import org.junit.Assert.* ; -import org.junit.Test ; -import java.util.Map ; -import java.util.HashMap ; +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.HashMap; + +import org.junit.Test; public class TestConfig { - // Utility methods to make it easier to tell the class of a primitive via overloaded args + /** + * Utility methods to make it easier to tell the class of a primitive via + * overloaded args + */ Class getClass(long l) { return Long.class ; } @@ -35,25 +40,25 @@ public class TestConfig { } @Test - public void testgetShortAndLong(){ - Map<String, String> m = new HashMap<String, String>() { { - put("testkey", "11") ; - } } ; + public void testgetShortAndLong() { + Map<String, String> m = new HashMap<String, String>() {{ + put("testkey", "11"); + }}; - MapConfig mc = new MapConfig(m) ; - short defaultShort=0 ; - long defaultLong=0 ; + MapConfig mc = new MapConfig(m); + short defaultShort = 0; + long defaultLong = 0; - Class c1 = getClass(mc.getShort("testkey")) ; - assert(c1 == Short.class) ; + Class c1 = getClass(mc.getShort("testkey")); + assertEquals(Short.class, c1); - Class c2 = getClass(mc.getShort("testkey", defaultShort)) ; - assert(c2 == Short.class) ; + Class c2 = getClass(mc.getShort("testkey", defaultShort)); + assertEquals(Short.class, c2); - Class c3 = getClass(mc.getLong("testkey")) ; - assert(c3 == Long.class) ; + Class c3 = getClass(mc.getLong("testkey")); + assertEquals(Long.class, c3); - Class c4 = getClass(mc.getLong("testkey", defaultLong)) ; - assert(c4 == Long.class) ; + Class c4 = getClass(mc.getLong("testkey", defaultLong)); + assertEquals(Long.class, c4); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java index eb5043b..d392b32 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestSlidingTimeWindowReservoir.java @@ -19,6 +19,7 @@ package org.apache.samza.metrics; +import static org.mockito.Mockito.*; import static org.junit.Assert.*; import java.util.Arrays; @@ -26,15 +27,14 @@ import java.util.Arrays; import org.apache.samza.util.Clock; import org.junit.Test; -import static org.mockito.Mockito.*; - public class TestSlidingTimeWindowReservoir { private final Clock clock = mock(Clock.class); @Test public void testUpdateSizeSnapshot() { - SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + SlidingTimeWindowReservoir slidingTimeWindowReservoir = + new SlidingTimeWindowReservoir(300, clock); when(clock.currentTimeMillis()).thenReturn(0L); slidingTimeWindowReservoir.update(1L); @@ -49,24 +49,26 @@ public class TestSlidingTimeWindowReservoir { Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L))); - assertTrue(snapshot.getSize() == 3); + assertEquals(3, snapshot.getSize()); } @Test public void testDuplicateTime() { - SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + SlidingTimeWindowReservoir slidingTimeWindowReservoir = + new SlidingTimeWindowReservoir(300, clock); when(clock.currentTimeMillis()).thenReturn(0L); slidingTimeWindowReservoir.update(1L); slidingTimeWindowReservoir.update(2L); Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L))); - assertTrue(snapshot.getSize() == 2); + assertEquals(2, snapshot.getSize()); } @Test public void testRemoveExpiredValues() { - SlidingTimeWindowReservoir slidingTimeWindowReservoir = new SlidingTimeWindowReservoir(300, clock); + SlidingTimeWindowReservoir slidingTimeWindowReservoir = + new SlidingTimeWindowReservoir(300, clock); when(clock.currentTimeMillis()).thenReturn(0L); slidingTimeWindowReservoir.update(1L); @@ -81,6 +83,6 @@ public class TestSlidingTimeWindowReservoir { Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L))); - assertTrue(snapshot.getSize() == 2); + assertEquals(2, snapshot.getSize()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java index dcc3cb8..63c183f 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -28,7 +28,9 @@ import org.junit.Test; public class TestTimer { - // mock clock + /* + * Mock clock + */ private final Clock clock = new Clock() { long value = 0; @@ -46,7 +48,7 @@ public class TestTimer { Snapshot snapshot = timer.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L))); - assertTrue(snapshot.getValues().size() == 2); + assertEquals(2, snapshot.getValues().size()); } @Test @@ -58,13 +60,13 @@ public class TestTimer { Snapshot snapshot = timer.getSnapshot(); assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L))); - assertTrue(snapshot.getValues().size() == 3); + assertEquals(3, snapshot.getValues().size()); - // the time is 500 for update(4L) because getSnapshot calls clock once + 3 + // The time is 500 for update(4L) because getSnapshot calls clock once + 3 // updates that call clock 3 times timer.update(4L); Snapshot snapshot2 = timer.getSnapshot(); assertTrue(snapshot2.getValues().containsAll(Arrays.asList(3L, 4L))); - assertTrue(snapshot2.getValues().size() == 2); + assertEquals(2, snapshot2.getValues().size()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java index 35ba52d..4eb87eb 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java @@ -22,6 +22,7 @@ package org.apache.samza.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -30,6 +31,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; + import org.apache.samza.Partition; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java index 2d0034f..1d1e3c5 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestNoOpMetricsRegistry.java @@ -33,32 +33,38 @@ public class TestNoOpMetricsRegistry { Counter counter1 = registry.newCounter("testc", "a"); Counter counter2 = registry.newCounter("testc", "b"); Counter counter3 = registry.newCounter("testc2", "c"); + Gauge<String> gauge1 = registry.newGauge("testg", "a", "1"); Gauge<String> gauge2 = registry.newGauge("testg", "b", "2"); Gauge<String> gauge3 = registry.newGauge("testg", "c", "3"); Gauge<String> gauge4 = registry.newGauge("testg2", "d", "4"); + Timer timer1 = registry.newTimer("testt", "a"); Timer timer2 = registry.newTimer("testt", "b"); Timer timer3 = registry.newTimer("testt2", "c"); + counter1.inc(); counter2.inc(2); counter3.inc(4); + gauge1.set("5"); gauge2.set("6"); gauge3.set("7"); gauge4.set("8"); + timer1.update(1L); timer2.update(2L); timer3.update(3L); - assertEquals(counter1.getCount(), 1); - assertEquals(counter2.getCount(), 2); - assertEquals(counter3.getCount(), 4); - assertEquals(gauge1.getValue(), "5"); - assertEquals(gauge2.getValue(), "6"); - assertEquals(gauge3.getValue(), "7"); - assertEquals(gauge4.getValue(), "8"); - assertEquals(timer1.getSnapshot().getAverage(), 1, 0); - assertEquals(timer2.getSnapshot().getAverage(), 2, 0); - assertEquals(timer3.getSnapshot().getAverage(), 3, 0); + + assertEquals(1, counter1.getCount()); + assertEquals(2, counter2.getCount()); + assertEquals(4, counter3.getCount()); + assertEquals("5", gauge1.getValue()); + assertEquals("6", gauge2.getValue()); + assertEquals("7", gauge3.getValue()); + assertEquals("8", gauge4.getValue()); + assertEquals(1, timer1.getSnapshot().getAverage(), 0); + assertEquals(2, timer2.getSnapshot().getAverage(), 0); + assertEquals(3, timer3.getSnapshot().getAverage(), 0); } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java index 4166493..025f0a6 100644 --- a/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/test/java/org/apache/samza/util/TestSinglePartitionWithoutOffsetsSystemAdmin.java @@ -20,10 +20,12 @@ package org.apache.samza.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.util.HashSet; import java.util.Map; import java.util.Set; + import org.apache.samza.Partition; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.SystemStreamMetadata; @@ -36,12 +38,14 @@ public class TestSinglePartitionWithoutOffsetsSystemAdmin { Set<String> streamNames = new HashSet<String>(); streamNames.add("a"); streamNames.add("b"); + Map<String, SystemStreamMetadata> metadata = admin.getSystemStreamMetadata(streamNames); - assertEquals(metadata.size(), 2); + assertEquals(2, metadata.size()); SystemStreamMetadata metadata1 = metadata.get("a"); SystemStreamMetadata metadata2 = metadata.get("b"); + assertEquals(1, metadata1.getSystemStreamPartitionMetadata().size()); assertEquals(1, metadata2.getSystemStreamPartitionMetadata().size()); - assertEquals(null, metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0)))); + assertNull(metadata.get(new SystemStreamPartition("test-system", "c", new Partition(0)))); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala index 1eb3995..af800df 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -20,6 +20,7 @@ package org.apache.samza.checkpoint import org.apache.samza.Partition +import org.apache.samza.container.TaskName import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory} import org.apache.samza.config.{Config, MapConfig, SystemConfig, TaskConfig} import org.apache.samza.metrics.MetricsRegistry @@ -30,8 +31,8 @@ import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mock.MockitoSugar + import scala.collection.JavaConversions._ -import org.apache.samza.container.TaskName object TestCheckpointTool { var checkpointManager: CheckpointManager = null http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index 44a98a5..a79ecca 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -19,7 +19,9 @@ package org.apache.samza.checkpoint -import scala.collection.JavaConversions._ +import java.util + +import org.apache.samza.container.TaskName import org.apache.samza.Partition import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamMetadata @@ -30,10 +32,10 @@ import org.junit.{Ignore, Test} import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig import org.apache.samza.system.SystemAdmin -import java.util -import org.apache.samza.container.TaskName import org.scalatest.Assertions.intercept +import scala.collection.JavaConversions._ + class TestOffsetManager { @Test def testSystemShouldUseDefaults { @@ -47,7 +49,7 @@ class TestOffsetManager { val offsetManager = OffsetManager(systemStreamMetadata, config) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start - assertTrue(!offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined) + assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined) assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined) assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get) } @@ -232,7 +234,6 @@ class TestOffsetManager { override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping - } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala index f254741..9688abb 100644 --- a/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala +++ b/samza-core/src/test/scala/org/apache/samza/config/factories/TestPropertiesConfigFactory.scala @@ -18,8 +18,10 @@ */ package org.apache.samza.config.factories + import java.net.URI import java.io.File + import org.apache.samza.SamzaException import org.junit.Assert._ import org.junit.Test @@ -30,7 +32,7 @@ class TestPropertiesConfigFactory { @Test def testCanReadPropertiesConfigFiles { val config = factory.getConfig(URI.create("file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath)) - assert("bar".equals(config.get("foo"))) + assertEquals("bar", config.get("foo")) } @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 8a04a8a..b7a9569 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -24,26 +24,26 @@ import org.junit.Assert._ import org.junit.Test import org.apache.samza.Partition import org.apache.samza.config.MapConfig +import org.apache.samza.metrics.JmxServer +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.system.SystemConsumer import org.apache.samza.system.SystemProducers import org.apache.samza.system.SystemProducer +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStream +import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.chooser.RoundRobinChooser import org.apache.samza.serializers.SerdeManager import org.apache.samza.task.StreamTask import org.apache.samza.task.MessageCollector -import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.InitableTask import org.apache.samza.task.TaskContext import org.apache.samza.task.ClosableTask -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin -import org.apache.samza.system.SystemStream -import org.apache.samza.system.StreamMetadataCache import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.scalatest.junit.AssertionsForJUnit -import org.apache.samza.metrics.JmxServer class TestSamzaContainer extends AssertionsForJUnit { @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index be53373..c31a74e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -21,27 +21,28 @@ package org.apache.samza.container import org.junit.Assert._ import org.junit.Test -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemProducers -import org.apache.samza.task.MessageCollector -import org.apache.samza.task.StreamTask -import org.apache.samza.system.SystemConsumers -import org.apache.samza.task.TaskCoordinator -import org.apache.samza.config.MapConfig import org.apache.samza.Partition -import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.system.SystemProducer +import org.apache.samza.checkpoint.OffsetManager +import org.apache.samza.config.MapConfig import org.apache.samza.serializers.SerdeManager +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemConsumer +import org.apache.samza.system.SystemConsumers +import org.apache.samza.system.SystemProducer +import org.apache.samza.system.SystemProducers import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import scala.collection.JavaConversions._ +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.chooser.RoundRobinChooser +import org.apache.samza.task.MessageCollector +import org.apache.samza.task.ReadableCoordinator +import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskInstanceCollector +import scala.collection.JavaConversions._ + class TestTaskInstance { @Test def testOffsetsAreUpdatedOnProcess { @@ -80,4 +81,4 @@ class TestTaskInstance { assertTrue(lastProcessedOffset.isDefined) assertEquals("2", lastProcessedOffset.get) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala index d680b20..9a3406e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskNamesToSystemStreamPartitions.scala @@ -18,10 +18,10 @@ */ package org.apache.samza.container -import org.junit.Test -import org.junit.Assert._ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.{SamzaException, Partition} +import org.junit.Test +import org.junit.Assert._ class TestTaskNamesToSystemStreamPartitions { var sspCounter = 0 @@ -36,7 +36,7 @@ class TestTaskNamesToSystemStreamPartitions { val asSet = tntssp.toSet val expected = Set(new TaskName("tn1") -> Set(makeSSP("tn1-1"), makeSSP("tn1-2")), (new TaskName("tn2") -> Set(makeSSP("tn2-1"), makeSSP("tn2-2")))) - assertEquals(expected , asSet) + assertEquals(expected, asSet) } @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala index 47d716e..a14169b 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala @@ -18,15 +18,16 @@ */ package org.apache.samza.container.grouper.stream -import org.apache.samza.Partition -import org.apache.samza.system.SystemStreamPartition -import org.junit.Test +import java.util.Collections import java.util.HashSet import java.util.Map import java.util.Set -import org.junit.Assert._ -import java.util.Collections + +import org.apache.samza.Partition import org.apache.samza.container.TaskName +import org.apache.samza.system.SystemStreamPartition +import org.junit.Test +import org.junit.Assert._ object GroupByTestBase { val aa0 = new SystemStreamPartition("SystemA", "StreamA", new Partition(0)) @@ -54,4 +55,4 @@ abstract class GroupByTestBase { val result: Map[TaskName, Set[SystemStreamPartition]] = grouper.group(input) assertEquals(output, result) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala index 2fa718c..74daf72 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala @@ -19,15 +19,17 @@ package org.apache.samza.container.grouper.stream import org.apache.samza.container.TaskName -import scala.collection.JavaConverters._ import org.junit.Test +import scala.collection.JavaConverters._ + class TestGroupByPartition extends GroupByTestBase { import GroupByTestBase._ - val expected /* from base class provided set */ = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, - new TaskName("Partition 1") -> Set(aa1, ab1).asJava, - new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava + // from base class provided set + val expected = Map(new TaskName("Partition 0") -> Set(aa0, ac0).asJava, + new TaskName("Partition 1") -> Set(aa1, ab1).asJava, + new TaskName("Partition 2") -> Set(aa2, ab2).asJava).asJava override def getGrouper: SystemStreamPartitionGrouper = new GroupByPartition http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala index 8da0595..deb3895 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala @@ -19,19 +19,20 @@ package org.apache.samza.container.grouper.stream import org.apache.samza.container.TaskName -import scala.collection.JavaConverters._ import org.junit.Test +import scala.collection.JavaConverters._ class TestGroupBySystemStreamPartition extends GroupByTestBase { import GroupByTestBase._ // Building manually to avoid just duplicating a logic potential logic error here and there - val expected /* from base class provided set */ = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, - new TaskName(aa1.toString) -> Set(aa1).asJava, - new TaskName(aa2.toString) -> Set(aa2).asJava, - new TaskName(ab1.toString) -> Set(ab1).asJava, - new TaskName(ab2.toString) -> Set(ab2).asJava, - new TaskName(ac0.toString) -> Set(ac0).asJava).asJava + // From base class provided set + val expected = Map(new TaskName(aa0.toString) -> Set(aa0).asJava, + new TaskName(aa1.toString) -> Set(aa1).asJava, + new TaskName(aa2.toString) -> Set(aa2).asJava, + new TaskName(ab1.toString) -> Set(ab1).asJava, + new TaskName(ab2.toString) -> Set(ab2).asJava, + new TaskName(ac0.toString) -> Set(ac0).asJava).asJava override def getGrouper: SystemStreamPartitionGrouper = new GroupBySystemStreamPartition http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala index 258ccc1..52057ed 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala @@ -18,9 +18,12 @@ */ package org.apache.samza.job + import java.io.File + import org.apache.samza.config.Config import org.junit.Test +import org.junit.Assert._ object TestJobRunner { var processCount = 0 @@ -34,7 +37,7 @@ class TestJobRunner { "org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path", "file://%s/src/test/resources/test.properties" format new File(".").getCanonicalPath)) - assert(TestJobRunner.processCount == 1) + assertEquals(1, TestJobRunner.processCount) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala index f8a535a..b186ec1 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala @@ -18,12 +18,12 @@ */ package org.apache.samza.job -import org.junit.Test import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import org.apache.samza.util.Util._ import org.apache.samza.container.{TaskName, TaskNamesToSystemStreamPartitions} import org.junit.Assert._ +import org.junit.Test class TestShellCommandBuilder { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala index d56024d..7f3ccfe 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala @@ -18,6 +18,7 @@ */ package org.apache.samza.job.local; + import org.junit.Assert._ import org.junit.Test import org.apache.samza.job.ApplicationStatus @@ -39,6 +40,6 @@ class TestProcessJob { job.waitForFinish(500) job.kill job.waitForFinish(999999) - assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999))) + assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999)) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala index 7d45889..4f3f511 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala @@ -18,6 +18,7 @@ */ package org.apache.samza.job.local + import org.junit.Assert._ import org.junit.Test import org.apache.samza.job.ApplicationStatus @@ -44,6 +45,6 @@ class TestThreadJob { job.waitForFinish(500) job.kill job.waitForFinish(999999) - assert(ApplicationStatus.UnsuccessfulFinish.equals(job.waitForFinish(999999999))) + assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999)) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala index f01117d..f49cfaa 100644 --- a/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala +++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestJmxServer.scala @@ -22,14 +22,15 @@ package org.apache.samza.metrics import org.junit.Assert._ import org.junit.Test import org.apache.samza.util.Logging -import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} + import java.io.IOException +import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} class TestJmxServer extends Logging { @Test def serverStartsUp { - var jmxServer:JmxServer = null + var jmxServer: JmxServer = null try { jmxServer = new JmxServer @@ -45,14 +46,16 @@ class TestJmxServer extends Logging { assertTrue("Connected but mbean count is somehow 0", connection.getMBeanCount.intValue() > 0) } catch { case ioe:IOException => fail("Couldn't open connection to local JMX server") - }finally { - if(jmxConnector != null) jmxConnector.close + } finally { + if (jmxConnector != null) { + jmxConnector.close + } } } finally { - if (jmxServer != null) jmxServer.stop + if (jmxServer != null) { + jmxServer.stop + } } - } - } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala index f6c8646..3cfd439 100644 --- a/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala +++ b/samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala @@ -23,20 +23,23 @@ import org.junit.Assert._ import org.junit.AfterClass import org.junit.BeforeClass import org.junit.Test -import scala.collection.JavaConversions._ import org.apache.samza.task.TaskContext -import javax.management.remote.JMXConnectorFactory import org.apache.samza.metrics.MetricsRegistryMap -import javax.management.remote.JMXConnectorServerFactory -import javax.management.remote.JMXConnectorServer -import java.rmi.registry.LocateRegistry -import javax.management.remote.JMXServiceURL import org.apache.samza.config.MapConfig -import java.lang.management.ManagementFactory import org.apache.samza.Partition -import javax.management.ObjectName import org.apache.samza.metrics.JvmMetrics +import java.lang.management.ManagementFactory +import java.rmi.registry.LocateRegistry + +import javax.management.ObjectName +import javax.management.remote.JMXServiceURL +import javax.management.remote.JMXConnectorServerFactory +import javax.management.remote.JMXConnectorServer +import javax.management.remote.JMXConnectorFactory + +import scala.collection.JavaConversions._ + object TestJmxReporter { val port = 4500 val url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:%d/jmxapitestrmi" format port) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala index f64c263..f605762 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala @@ -19,19 +19,20 @@ package org.apache.samza.serializers +import java.util.Arrays + import org.junit.Assert._ import org.junit.Test -import java.util.Arrays class TestByteSerde { @Test def testByteSerde { val serde = new ByteSerde - assertEquals(null, serde.toBytes(null)) - assertEquals(null, serde.fromBytes(null)) + assertNull(serde.toBytes(null)) + assertNull(serde.fromBytes(null)) val testBytes = "A lazy way of creating a byte array".getBytes() - assertTrue(Arrays.equals(serde.toBytes(testBytes), testBytes)) - assertTrue( Arrays.equals(serde.fromBytes(testBytes), testBytes)) + assertArrayEquals(serde.toBytes(testBytes), testBytes) + assertArrayEquals(serde.fromBytes(testBytes), testBytes) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala index 0d07314..3d0a603 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala @@ -20,12 +20,14 @@ package org.apache.samza.serializers import java.util + import org.apache.samza.Partition import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.container.TaskName import org.apache.samza.system.SystemStreamPartition import org.junit.Assert._ import org.junit.Test + import scala.collection.JavaConversions._ class TestCheckpointSerde { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala index 45a2b04..ad646d7 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala @@ -19,9 +19,10 @@ package org.apache.samza.serializers +import java.util.Arrays + import org.junit.Assert._ import org.junit.Test -import java.util.Arrays class TestIntegerSerde { @Test @@ -33,7 +34,7 @@ class TestIntegerSerde { val fooBar = 37 val fooBarBytes = serde.toBytes(fooBar) fooBarBytes.foreach(System.err.println) - assertTrue(Arrays.equals(Array[Byte](0, 0, 0, 37), fooBarBytes)) + assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes) assertEquals(fooBar, serde.fromBytes(fooBarBytes)) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala index 7fbf0c2..a1e8e88 100644 --- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala @@ -21,7 +21,6 @@ package org.apache.samza.serializers import org.junit.Assert._ import org.junit.Test -import java.util.Arrays class TestStringSerde { @Test @@ -32,7 +31,7 @@ class TestStringSerde { val fooBar = "foo bar" val fooBarBytes = serde.toBytes(fooBar) - assertTrue(Arrays.equals(fooBar.getBytes("UTF-8"), fooBarBytes)) + assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes) assertEquals(fooBar, serde.fromBytes(fooBarBytes)) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index 04229a6..3fdc781 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -19,15 +19,16 @@ package org.apache.samza.system -import scala.collection.JavaConversions._ -import org.apache.samza.Partition import org.junit.Assert._ import org.junit.Test +import org.apache.samza.Partition +import org.apache.samza.serializers._ import org.apache.samza.system.chooser.MessageChooser import org.apache.samza.system.chooser.DefaultChooser -import org.apache.samza.util.BlockingEnvelopeMap -import org.apache.samza.serializers._ import org.apache.samza.system.chooser.MockMessageChooser +import org.apache.samza.util.BlockingEnvelopeMap + +import scala.collection.JavaConversions._ class TestSystemConsumers { def testPollIntervalMs { @@ -44,7 +45,7 @@ class TestSystemConsumers { consumers.register(systemStreamPartition1, "1234") consumers.start - // Tell the consumer to respond with 1000 messages for SSP0, and no + // Tell the consumer to respond with 1000 messages for SSP0, and no // messages for SSP1. consumer.setResponseSizes(numEnvelopes) @@ -60,13 +61,13 @@ class TestSystemConsumers { // We aren't polling because we're getting non-null envelopes. assertEquals(2, consumer.polls) - // Advance the clock to trigger a new poll even though there are still + // Advance the clock to trigger a new poll even though there are still // messages. now = SystemConsumers.DEFAULT_POLL_INTERVAL_MS assertEquals(envelope, consumers.choose) - // We polled even though there are still 997 messages in the unprocessed + // We polled even though there are still 997 messages in the unprocessed // message buffer. assertEquals(3, consumer.polls) assertEquals(1, consumer.lastPoll.size) @@ -74,7 +75,7 @@ class TestSystemConsumers { // Only SSP1 was polled because we still have messages for SSP2. assertTrue(consumer.lastPoll.contains(systemStreamPartition1)) - // Now drain all messages for SSP0. There should be exactly 997 messages, + // Now drain all messages for SSP0. There should be exactly 997 messages, // since we have chosen 3 already, and we started with 1000. (0 until (numEnvelopes - 3)).foreach { i => assertEquals(envelope, consumers.choose) @@ -296,4 +297,4 @@ class TestSystemConsumers { def stop {} def register { super.register(systemStreamPartition, "0") } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala index d7632b4..6d53697 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBatchingChooser.scala @@ -19,16 +19,18 @@ package org.apache.samza.system.chooser -import org.junit.Assert._ -import org.junit.Test +import java.util.Arrays + +import org.apache.samza.Partition import org.apache.samza.system.IncomingMessageEnvelope -import scala.collection.immutable.Queue import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition +import org.junit.Assert._ +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.Arrays + +import scala.collection.immutable.Queue @RunWith(value = classOf[Parameterized]) class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) { @@ -45,9 +47,9 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) { chooser.start // Make sure start and register are working. assertEquals(1, mock.starts) - assertEquals(null, mock.registers(envelope1.getSystemStreamPartition)) + assertNull(mock.registers(envelope1.getSystemStreamPartition)) assertEquals("", mock.registers(envelope2.getSystemStreamPartition)) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) assertEquals(envelope1, mock.getEnvelopes.head) assertEquals(envelope1, chooser.choose) @@ -84,11 +86,11 @@ class TestBatchingChooser(getChooser: (MessageChooser, Int) => MessageChooser) { } object TestBatchingChooser { - // Test both BatchingChooser and DefaultChooser here. DefaultChooser with - // just batch size defined should behave just like plain vanilla batching + // Test both BatchingChooser and DefaultChooser here. DefaultChooser with + // just batch size defined should behave just like plain vanilla batching // chooser. @Parameters def parameters: java.util.Collection[Array[(MessageChooser, Int) => MessageChooser]] = Arrays.asList( Array((wrapped: MessageChooser, batchSize: Int) => new BatchingChooser(wrapped, batchSize)), Array((wrapped: MessageChooser, batchSize: Int) => new DefaultChooser(wrapped, Some(batchSize)))) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index 993daa6..3c2693c 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -19,20 +19,22 @@ package org.apache.samza.system.chooser -import org.junit.Assert._ -import org.junit.Test +import java.util.Arrays + import org.apache.samza.system.IncomingMessageEnvelope -import scala.collection.immutable.Queue import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition +import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.junit.Assert._ +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.Arrays + import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemStream +import scala.collection.immutable.Queue @RunWith(value = classOf[Parameterized]) class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) { @@ -61,7 +63,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy assertEquals("foo", mock.registers(envelope1.getSystemStreamPartition)) chooser.update(envelope1) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.stop assertEquals(1, mock.stops) } @@ -72,16 +74,16 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val metadata = getMetadata(envelope1, "100", Some("123")) val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata)) - // Even though envelope1's SSP is registered as a bootstrap stream, since - // 123=123, it should be marked as "caught up" and treated like a normal - // stream. This means that non-bootstrap stream envelope should be allowed + // Even though envelope1's SSP is registered as a bootstrap stream, since + // 123=123, it should be marked as "caught up" and treated like a normal + // stream. This means that non-bootstrap stream envelope should be allowed // to be chosen. chooser.register(envelope1.getSystemStreamPartition, "123") chooser.register(envelope2.getSystemStreamPartition, "321") chooser.start chooser.update(envelope2) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -90,40 +92,40 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val metadata = getMetadata(envelope1, "123") val chooser = getChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata)) - // Even though envelope1's SSP is registered as a bootstrap stream, since - // 123=123, it should be marked as "caught up" and treated like a normal - // stream. This means that non-bootstrap stream envelope should be allowed + // Even though envelope1's SSP is registered as a bootstrap stream, since + // 123=123, it should be marked as "caught up" and treated like a normal + // stream. This means that non-bootstrap stream envelope should be allowed // to be chosen. chooser.register(envelope1.getSystemStreamPartition, "1") chooser.register(envelope2.getSystemStreamPartition, null) chooser.start chooser.update(envelope2) - // Choose should not return anything since bootstrapper is blocking + // Choose should not return anything since bootstrapper is blocking // wrapped.choose until it gets an update from envelope1's SSP. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) - // Now that we have an update from the required SSP, the mock chooser + // Now that we have an update from the required SSP, the mock chooser // should be called, and return. assertEquals(envelope2, chooser.choose) - // The chooser still has an envelope from envelope1's SSP, so it should + // The chooser still has an envelope from envelope1's SSP, so it should // return. assertEquals(envelope1, chooser.choose) // No envelope for envelope1's SSP has been given, so it should block. chooser.update(envelope2) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Now we're giving an envelope with the proper last offset (123), so no // envelope1's SSP should be treated no differently than envelope2's. chooser.update(envelope4) assertEquals(envelope2, chooser.choose) assertEquals(envelope4, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Should not block here since there are no more lagging bootstrap streams. chooser.update(envelope2) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -138,54 +140,54 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy chooser.register(envelope3.getSystemStreamPartition, "1") chooser.start chooser.update(envelope1) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope3) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) // Fully loaded now. assertEquals(envelope1, chooser.choose) // Can't pick again because envelope1's SSP is missing. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) // Can pick again. assertEquals(envelope3, chooser.choose) // Can still pick since envelope3.SSP isn't being tracked. assertEquals(envelope2, chooser.choose) // Can't pick since envelope2.SSP needs an envelope now. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) // Now we get envelope1 again. assertEquals(envelope1, chooser.choose) // Can't pick again. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Now use envelope4, to trigger "all caught up" for envelope1.SSP. chooser.update(envelope4) // Chooser's contents is currently: e2, e4 (System.err.println(mock.getEnvelopes)) // Add envelope3, whose SSP isn't being tracked. chooser.update(envelope3) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) // Chooser's contents is currently: e4, e3, e2 (System.err.println(mock.getEnvelopes)) assertEquals(envelope4, chooser.choose) - // This should be allowed, even though no message from envelope1.SSP is - // available, since envelope4 triggered "all caught up" because its offset - // matches the offset map for this SSP, and we still have an envelope for + // This should be allowed, even though no message from envelope1.SSP is + // available, since envelope4 triggered "all caught up" because its offset + // matches the offset map for this SSP, and we still have an envelope for // envelope2.SSP in the queue. assertEquals(envelope3, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Fin. } } object TestBootstrappingChooser { - // Test both BatchingChooser and DefaultChooser here. DefaultChooser with - // just batch size defined should behave just like plain vanilla batching + // Test both BatchingChooser and DefaultChooser here. DefaultChooser with + // just batch size defined should behave just like plain vanilla batching // chooser. @Parameters def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList( Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)), Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata))) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index 884e458..0909956 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -26,12 +26,13 @@ import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import org.apache.samza.config.MapConfig -import scala.collection.JavaConversions._ import org.apache.samza.config.DefaultChooserConfig import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import scala.collection.JavaConversions._ + class TestDefaultChooser { val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1); val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2); @@ -47,7 +48,7 @@ class TestDefaultChooser { val mock0 = new MockMessageChooser val mock1 = new MockMessageChooser val mock2 = new MockMessageChooser - // Create metadata for two envelopes (1 and 5) that are part of the same + // Create metadata for two envelopes (1 and 5) that are part of the same // stream, but have different partitions and offsets. val env1Metadata = new SystemStreamPartitionMetadata(null, "123", null) val env5Metadata = new SystemStreamPartitionMetadata(null, "321", null) @@ -75,28 +76,28 @@ class TestDefaultChooser { chooser.register(envelope2.getSystemStreamPartition, null) chooser.register(envelope3.getSystemStreamPartition, null) chooser.register(envelope5.getSystemStreamPartition, null) - // Add a bootstrap stream that's already caught up. If everything is + // Add a bootstrap stream that's already caught up. If everything is // working properly, it shouldn't interfere with anything. chooser.register(envelope8.getSystemStreamPartition, "654") chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Load with a non-bootstrap stream, and should still get null. chooser.update(envelope3) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Load with a bootstrap stream, should get that envelope. chooser.update(envelope1) assertEquals(envelope1, chooser.choose) // Should block envelope3 since we have no message from envelope1's bootstrap stream. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Load envelope2 from non-bootstrap stream with higher priority than envelope3. chooser.update(envelope2) // Should block envelope2 since we have no message from envelope1's bootstrap stream. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Test batching by giving chooser envelope1 and envelope5, both from same stream, but envelope1 should be preferred partition. chooser.update(envelope5) @@ -107,14 +108,14 @@ class TestDefaultChooser { chooser.update(envelope1) assertEquals(envelope5, chooser.choose) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Now we're back to just envelope3, envelope2. Let's catch up envelope1's SSP using envelope4's offset. chooser.update(envelope4) assertEquals(envelope4, chooser.choose) // Should still block envelopes 1 and 2 because the second partition hasn't caught up yet. - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Now catch up the second partition. chooser.update(envelope6) @@ -135,7 +136,7 @@ class TestDefaultChooser { // Now we should finally get the lowest priority non-bootstrap stream, envelope3. assertEquals(envelope3, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -166,4 +167,4 @@ class TestDefaultChooser { class MockBlockingEnvelopeMap extends BlockingEnvelopeMap { def start = Unit def stop = Unit -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala index 01802b9..1329e84 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestRoundRobinChooser.scala @@ -19,15 +19,16 @@ package org.apache.samza.system.chooser -import org.junit.Assert._ -import org.junit.Test +import java.util.Arrays + import org.apache.samza.Partition import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition +import org.junit.Assert._ +import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.Arrays @RunWith(value = classOf[Parameterized]) class TestRoundRobinChooser(getChooser: () => MessageChooser) { @@ -43,12 +44,12 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) { chooser.register(envelope3.getSystemStreamPartition, "123") chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Test one message. chooser.update(envelope1) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Verify simple ordering. chooser.update(envelope1) @@ -58,7 +59,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) { assertEquals(envelope1, chooser.choose) assertEquals(envelope2, chooser.choose) assertEquals(envelope3, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Verify mixed ordering. chooser.update(envelope2) @@ -72,7 +73,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) { assertEquals(envelope1, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Verify simple ordering with different starting envelope. chooser.update(envelope2) @@ -82,7 +83,7 @@ class TestRoundRobinChooser(getChooser: () => MessageChooser) { assertEquals(envelope2, chooser.choose) assertEquals(envelope1, chooser.choose) assertEquals(envelope3, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } } @@ -92,4 +93,4 @@ object TestRoundRobinChooser { // plain vanilla round robin chooser. @Parameters def parameters: java.util.Collection[Array[() => MessageChooser]] = Arrays.asList(Array(() => new RoundRobinChooser), Array(() => new DefaultChooser)) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala index 4cde630..3e435ae 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestTieredPriorityChooser.scala @@ -19,18 +19,20 @@ package org.apache.samza.system.chooser +import java.util.Arrays + import org.junit.Assert._ import org.junit.Test -import org.apache.samza.system.IncomingMessageEnvelope -import scala.collection.immutable.Queue -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition -import org.apache.samza.SamzaException import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import java.util.Arrays +import org.apache.samza.Partition +import org.apache.samza.SamzaException +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition + +import scala.collection.immutable.Queue @RunWith(value = classOf[Parameterized]) class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser) { @@ -68,10 +70,10 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me chooser.register(envelope1.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -85,7 +87,7 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me // The SSP for envelope2 is not defined as a priority stream. chooser.register(envelope2.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) try { chooser.update(envelope2) @@ -106,18 +108,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me chooser.register(envelope1.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) chooser.update(envelope4) assertEquals(envelope1, chooser.choose) assertEquals(envelope4, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope4) chooser.update(envelope1) assertEquals(envelope4, chooser.choose) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -132,18 +134,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me chooser.register(envelope3.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) chooser.update(envelope3) assertEquals(envelope2, chooser.choose) assertEquals(envelope3, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope3) chooser.update(envelope2) assertEquals(envelope3, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -160,30 +162,30 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me chooser.register(envelope2.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) chooser.update(envelope4) assertEquals(envelope1, chooser.choose) assertEquals(envelope4, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope4) chooser.update(envelope1) assertEquals(envelope4, chooser.choose) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) chooser.update(envelope4) assertEquals(envelope2, chooser.choose) assertEquals(envelope4, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) chooser.update(envelope2) assertEquals(envelope1, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } @Test @@ -203,18 +205,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me chooser.register(envelope2.getSystemStreamPartition, null) chooser.start - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) chooser.update(envelope4) assertEquals(envelope1, chooser.choose) assertEquals(envelope4, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope4) chooser.update(envelope1) assertEquals(envelope4, chooser.choose) assertEquals(envelope1, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope2) chooser.update(envelope4) @@ -222,18 +224,18 @@ class TestTieredPriorityChooser(getChooser: (Map[SystemStream, Int], Map[Int, Me // priority. assertEquals(envelope4, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) chooser.update(envelope1) chooser.update(envelope2) assertEquals(envelope1, chooser.choose) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) // Just the low priority stream. chooser.update(envelope2) assertEquals(envelope2, chooser.choose) - assertEquals(null, chooser.choose) + assertNull(chooser.choose) } } @@ -245,4 +247,4 @@ object TestTieredPriorityChooser { def parameters: java.util.Collection[Array[(Map[SystemStream, Int], Map[Int, MessageChooser], MessageChooser) => MessageChooser]] = Arrays.asList( Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new TieredPriorityChooser(priorities, choosers, default)), Array((priorities: Map[SystemStream, Int], choosers: Map[Int, MessageChooser], default: MessageChooser) => new DefaultChooser(default, None, priorities, choosers))) -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala index fb26bfc..525d126 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala @@ -19,20 +19,22 @@ package org.apache.samza.system.filereader -import org.junit.Assert._ -import scala.collection.JavaConversions._ import java.io.PrintWriter import java.io.File -import org.scalatest.junit.AssertionsForJUnit -import org.junit.Test -import org.junit.Before -import org.junit.After import java.io.RandomAccessFile + +import org.apache.samza.SamzaException import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata +import org.junit.Assert._ +import org.junit.Test +import org.junit.Before +import org.junit.After +import org.scalatest.junit.AssertionsForJUnit + import scala.collection.mutable.HashMap -import org.apache.samza.SamzaException +import scala.collection.JavaConversions._ class TestFileReaderSystemAdmin extends AssertionsForJUnit { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala index f505eb1..5707bb4 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala @@ -22,12 +22,14 @@ package org.apache.samza.system.filereader import java.io.File import java.io.FileWriter import java.io.PrintWriter + import org.apache.samza.Partition import org.apache.samza.system.SystemStreamPartition import org.junit.AfterClass import org.junit.Assert._ import org.junit.BeforeClass import org.junit.Test + import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala index 330df78..c3295f3 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala @@ -19,11 +19,12 @@ package org.apache.samza.system.filereader +import org.apache.samza.SamzaException import org.junit.Assert._ -import scala.collection.JavaConversions._ -import org.scalatest.junit.AssertionsForJUnit import org.junit.Test -import org.apache.samza.SamzaException +import org.scalatest.junit.AssertionsForJUnit + +import scala.collection.JavaConversions._ class TestFileReaderSystemFactory extends AssertionsForJUnit { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala index 7cfeb5a..c141b5f 100644 --- a/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/task/TestReadableCoordinator.scala @@ -19,10 +19,10 @@ package org.apache.samza.task -import org.junit.Assert._ -import org.junit.Test import org.apache.samza.task.TaskCoordinator.RequestScope import org.apache.samza.container.TaskName +import org.junit.Assert._ +import org.junit.Test class TestReadableCoordinator { val taskName = new TaskName("P0") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala index 6353378..ee56e20 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.util import org.junit.Assert._ @@ -28,9 +29,9 @@ class TestDaemonThreadFactory { val dtf = new DaemonThreadFactory(testThreadName) val threadWithName = dtf.newThread(new Runnable { def run() { - //Not testing this particular method + // Not testing this particular method } }) - assertEquals(threadWithName.getName, ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName) + assertEquals(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + testThreadName, threadWithName.getName) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala index 4a561d1..9ba8a4d 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestExponentialSleepStrategy.scala @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,14 +15,13 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ package org.apache.samza.util +import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop import org.junit.Assert._ import org.junit.Test -import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop class TestExponentialSleepStrategy { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index 7c314ce..8c21901 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.util import org.apache.samza.Partition @@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.Util._ import org.junit.Assert._ import org.junit.Test + import scala.collection.JavaConversions._ import scala.util.Random http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala index 7a23041..b76d5ad 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala @@ -18,10 +18,10 @@ */ package org.apache.samza.checkpoint.kafka +import org.apache.samza.SamzaException import org.apache.samza.container.TaskName import org.junit.Assert._ import org.junit.{Before, Test} -import org.apache.samza.SamzaException class TestKafkaCheckpointLogKey { @Before http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 4827731..553d6b4 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -19,6 +19,7 @@ package org.apache.samza.checkpoint.kafka +import kafka.admin.AdminUtils import kafka.common.InvalidMessageSizeException import kafka.common.UnknownTopicOrPartitionException import kafka.message.InvalidMessageException @@ -31,20 +32,20 @@ import kafka.utils.TestZKUtils import kafka.utils.Utils import kafka.utils.ZKStringSerializer import kafka.zk.EmbeddedZookeeper + import org.I0Itec.zkclient.ZkClient import org.apache.samza.checkpoint.Checkpoint +import org.apache.samza.config.MapConfig import org.apache.samza.container.TaskName +import org.apache.samza.container.grouper.stream.GroupByPartitionFactory import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore } import org.apache.samza.{ SamzaException, Partition } import org.junit.Assert._ import org.junit.{ AfterClass, BeforeClass, Test } -import scala.collection.JavaConversions._ + import scala.collection._ -import org.apache.samza.container.grouper.stream.GroupByPartitionFactory -import kafka.admin.AdminUtils -import org.apache.samza.config.MapConfig import scala.collection.JavaConversions._ object TestKafkaCheckpointManager { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 468aa3d..8109f73 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -19,15 +19,18 @@ package org.apache.samza.config -import org.junit.Assert._ -import org.junit.Test import java.net.URI import java.io.File import java.util.Properties -import scala.collection.JavaConversions._ -import org.apache.samza.config.factories.PropertiesConfigFactory + import kafka.consumer.ConsumerConfig +import org.apache.samza.config.factories.PropertiesConfigFactory +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConversions._ + class TestKafkaConfig { @Test http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala index fabae68..5cf82c2 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala @@ -18,10 +18,12 @@ */ package org.apache.samza.config + +import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde import org.junit.Assert._ import org.junit.Test + import scala.collection.JavaConversions._ -import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde class TestKafkaSerdeConfig { val MAGIC_VAL = "1000" @@ -33,7 +35,7 @@ class TestKafkaSerdeConfig { @Test def testKafkaConfigurationIsBackwardsCompatible { - assert(config.getKafkaEncoder("test").getOrElse("").equals(MAGIC_VAL)) - assert(config.getKafkaDecoder("test").getOrElse("").equals(MAGIC_VAL)) + assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse("")) + assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse("")) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/811f2897/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala index 77cdbe3..89ced34 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala @@ -19,11 +19,13 @@ package org.apache.samza.config -import org.junit.Test import collection.JavaConversions._ + +import org.apache.samza.SamzaException import org.junit.Assert._ +import org.junit.Test + import KafkaConfig._ -import org.apache.samza.SamzaException class TestRegExTopicGenerator {
