This is an automated email from the ASF dual-hosted git repository. shanthoosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 275d757 SAMZA-2129: Move the offset comparison check to SystemConsumers layer. (#954) 275d757 is described below commit 275d7570bc65aa3a0592ea3a6ec4063b91c4b222 Author: shanthoosh <svenkatara...@linkedin.com> AuthorDate: Mon Mar 18 15:13:01 2019 -0700 SAMZA-2129: Move the offset comparison check to SystemConsumers layer. (#954) 1. Currently it is duplicated in some of the SystemConsumer implementation. 2. Some SystemConsumer implementations do not perform this offsetComparator check in the implementation of register method. Moving this one level up from SystemConsumer.register(SystemStreamPartition, offset) API implementation to SystemConsumers.register(SystemStreamPartition, offset) API implementation removes unnecessary duplication and ensures functional correctness. --- .../apache/samza/container/SamzaContainer.scala | 1 + .../samza/storage/ContainerStorageManager.java | 2 +- .../org/apache/samza/system/SystemConsumers.scala | 21 +++++++- .../org/apache/samza/task/TestAsyncRunLoop.java | 10 +++- .../samza/processor/StreamProcessorTestUtils.scala | 2 +- .../apache/samza/system/TestSystemConsumers.scala | 62 +++++++++++++++------- 6 files changed, 75 insertions(+), 23 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index ab89396..70ff87d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -464,6 +464,7 @@ object SamzaContainer extends Logging { val consumerMultiplexer = new SystemConsumers( chooser = chooser, consumers = consumers, + systemAdmins = systemAdmins, serdeManager = serdeManager, metrics = systemConsumersMetrics, dropDeserializationError = dropDeserializationError, diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index da61a35..a9443b4 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -237,7 +237,7 @@ public class ContainerStorageManager { sideInputSystemConsumersMetrics.registry(), systemAdmins); sideInputSystemConsumers = - new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager, + new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager, sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime())); } diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index 68b2f09..8408433 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -63,6 +63,11 @@ class SystemConsumers ( consumers: Map[String, SystemConsumer], /** + * Provides a mapping from system name to a {@see SystemAdmin}. + */ + systemAdmins: SystemAdmins, + + /** * The class that handles deserialization of incoming messages. */ serdeManager: SerdeManager = new SerdeManager, @@ -111,6 +116,11 @@ class SystemConsumers ( val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil { /** + * Mapping from the {@see SystemStreamPartition} to the registered offsets. + */ + private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, String]() + + /** * A buffer of incoming messages grouped by SystemStreamPartition. These * messages are handed out to the MessageChooser as it needs them. */ @@ -154,6 +164,11 @@ class SystemConsumers ( metrics.setUnprocessedMessages(() => totalUnprocessedMessages) def start { + for ((systemStreamPartition, offset) <- sspToRegisteredOffsets.asScala) { + val consumer = consumers(systemStreamPartition.getSystem) + consumer.register(systemStreamPartition, offset) + } + debug("Starting consumers.") emptySystemStreamPartitionsBySystem.asScala ++= unprocessedMessagesBySSP .keySet @@ -208,7 +223,11 @@ class SystemConsumers ( if (startpoint != null) { consumer.register(systemStreamPartition, startpoint) } else { - consumer.register(systemStreamPartition, offset) + val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition) + val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem) + if (existingOffset == null || systemAdmin.offsetComparator(existingOffset, offset) > 0) { + sspToRegisteredOffsets.put(systemStreamPartition, offset) + } } } catch { case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e) diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 48f8619..6197690 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -41,6 +41,8 @@ import org.apache.samza.context.JobContext; import org.apache.samza.job.model.TaskModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; @@ -48,6 +50,7 @@ import org.apache.samza.system.TestSystemConsumers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.mockito.Mockito; import scala.Option; import scala.collection.JavaConverters; @@ -636,9 +639,14 @@ public class TestAsyncRunLoop { SystemConsumer mockConsumer = mock(SystemConsumer.class); when(mockConsumer.poll(anyObject(), anyLong())).thenReturn(sspMap); + SystemAdmins systemAdmins = Mockito.mock(SystemAdmins.class); + Mockito.when(systemAdmins.getSystemAdmin("system1")).thenReturn(Mockito.mock(SystemAdmin.class)); + Mockito.when(systemAdmins.getSystemAdmin("testSystem")).thenReturn(Mockito.mock(SystemAdmin.class)); + HashMap<String, SystemConsumer> systemConsumerMap = new HashMap<>(); systemConsumerMap.put("system1", mockConsumer); - SystemConsumers consumers = TestSystemConsumers.getSystemConsumers(systemConsumerMap); + + SystemConsumers consumers = TestSystemConsumers.getSystemConsumers(systemConsumerMap, systemAdmins); TaskName taskName1 = new TaskName("task1"); TaskName taskName2 = new TaskName("task2"); diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index 3ff651b..5ab7635 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -41,7 +41,7 @@ object StreamProcessorTestUtils { val adminMultiplexer = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, - Map[String, SystemConsumer]()) + Map[String, SystemConsumer](), SystemAdmins.empty()) val producerMultiplexer = new SystemProducers( Map[String, SystemProducer](), new SerdeManager) diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala index 0732311..15e2627 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala @@ -44,7 +44,10 @@ class TestSystemConsumers { val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v") val consumer = new CustomPollResponseSystemConsumer(envelope) var now = 0L - val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system)) + + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins, new SerdeManager, new SystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, @@ -106,7 +109,10 @@ class TestSystemConsumers { val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v") val consumer = new CustomPollResponseSystemConsumer(envelope) var now = 0 - val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system)) + + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins, new SerdeManager, new SystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, @@ -167,13 +173,16 @@ class TestSystemConsumers { def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava }) + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin])) + val consumers = new SystemConsumers(new MessageChooser { def update(envelope: IncomingMessageEnvelope) = Unit def choose = null def start = chooserStarted += 1 def stop = chooserStopped += 1 def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset - }, consumer, null) + }, consumer, systemAdmins) consumers.register(systemStreamPartition, "0", null) consumers.start @@ -231,15 +240,17 @@ class TestSystemConsumers { val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1)) val msgChooser = new DefaultChooser val consumer = Map(system -> new SerializingConsumer) - val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]); + val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]]) val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes) + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin])) // throw exceptions when the deserialization has error - val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false) + val consumers = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = false) consumers.register(systemStreamPartition, "0", null) - consumer(system).putBytesMessage - consumer(system).putStringMessage consumers.start + consumer(system).putStringMessage + consumer(system).putBytesMessage var caughtRightException = false try { @@ -248,16 +259,16 @@ class TestSystemConsumers { case e: SystemConsumersException => caughtRightException = true case _: Throwable => caughtRightException = false } - assertTrue("suppose to throw SystemConsumersException", caughtRightException); + assertTrue("suppose to throw SystemConsumersException", caughtRightException) consumers.stop // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true - val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true) + val consumers2 = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = true) consumers2.register(systemStreamPartition, "0", null) + consumers2.start consumer(system).putBytesMessage consumer(system).putStringMessage consumer(system).putBytesMessage - consumers2.start var notThrowException = true; try { @@ -299,8 +310,10 @@ class TestSystemConsumers { val normalEnvelope = new IncomingMessageEnvelope(systemStreamPartition1, "1", "k", "v") val endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition2) val consumer = new CustomPollResponseSystemConsumer(normalEnvelope) + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin])) val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), - new SerdeManager, new SystemConsumersMetrics, + systemAdmins, new SerdeManager, new SystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0) @@ -350,8 +363,11 @@ class TestSystemConsumers { val consumer = Mockito.mock(classOf[SystemConsumer]) val startpoint = Mockito.mock(classOf[Startpoint]) + val systemAdmins = Mockito.mock(classOf[SystemAdmins]) + Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin])) + val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), - new SerdeManager, new SystemConsumersMetrics, + systemAdmins, new SerdeManager, new SystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT, SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR, SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0) @@ -394,20 +410,28 @@ class TestSystemConsumers { */ private class SerializingConsumer extends BlockingEnvelopeMap { val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1)) - def putBytesMessage { + def putBytesMessage() { put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes())) } - def putStringMessage { + def putStringMessage() { put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test")) } - def start {} - def stop {} - def register { super.register(systemStreamPartition, "0") } + def start() {} + def stop() {} + + override def register(systemStreamPartition: SystemStreamPartition, offset: String): Unit = { + super[BlockingEnvelopeMap].register(systemStreamPartition, offset) + } + + override def register(systemStreamPartition: SystemStreamPartition, startpoint: Startpoint): Unit = { + super[BlockingEnvelopeMap].register(systemStreamPartition, startpoint) + } + } } object TestSystemConsumers { - def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer]) : SystemConsumers = { - new SystemConsumers(new DefaultChooser, consumers.asScala.toMap) + def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer], systemAdmins: SystemAdmins = SystemAdmins.empty()) : SystemConsumers = { + new SystemConsumers(new DefaultChooser, consumers.asScala.toMap, systemAdmins) } }