Repository: samza Updated Branches: refs/heads/master 5a88b9e47 -> ffd04d9d6
SAMZA-1100; Exception when using a stream as both bootstrap and broadcast. When a task input stream is used as both broadcast and bootstrap stream in a samza job, Bootstrappingchooser marks the stream as bootstrapped when a single task finishes consuming all the SystemStreamPartitions(This happens when all the starting offset for each partition in the input stream is of type upcoming). This patch fixes this, by marking a stream as bootstrapped, only when all the systemStreamPartitions in a input stream is consumed by all the expected tasks. More details here : https://issues.apache.org/jira/browse/SAMZA-1100 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Prateek Maheshwari<prate...@linkedin.com> Closes #68 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ffd04d9d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ffd04d9d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ffd04d9d Branch: refs/heads/master Commit: ffd04d9d674437b44d438e3e372aa0cbbf6b7044 Parents: 5a88b9e Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Mon Mar 13 12:07:45 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Mar 13 12:07:45 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 2 +- .../system/chooser/BootstrappingChooser.scala | 59 +++++++++++++++----- .../samza/system/chooser/DefaultChooser.scala | 23 +++++--- .../chooser/TestBootstrappingChooser.scala | 37 ++++++++++-- .../system/chooser/TestDefaultChooser.scala | 6 +- 5 files changed, 98 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 89522dc..b00e10f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -354,7 +354,7 @@ object SamzaContainer extends Logging { val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName) - val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry) + val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry, systemAdmins) info("Setting up metrics reporters.") http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala index fc99c84..f71bcfb 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala @@ -19,19 +19,21 @@ package org.apache.samza.system.chooser -import java.util.concurrent.atomic.AtomicInteger +import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.SamzaException import org.apache.samza.util.Logging import org.apache.samza.metrics.MetricsHelper import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.SystemStreamMetadata -import scala.collection.JavaConversions._ -import org.apache.samza.SamzaException import org.apache.samza.system.SystemStreamMetadata.OffsetType +import scala.collection.JavaConversions._ +import scala.collection.mutable + /** * BootstrappingChooser is a composable MessageChooser that only chooses * an envelope when it's received at least one envelope for each SystemStream. @@ -69,7 +71,13 @@ class BootstrappingChooser( /** * An object that holds all of the metrics related to bootstrapping. */ - metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics) extends MessageChooser with Logging { + metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics, + + /** + * A map from system stream name to SystemAdmin that is used for + * offset comparisons. + */ + systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging { /** * The number of lagging partitions for each SystemStream that's behind. @@ -91,9 +99,10 @@ class BootstrappingChooser( .toSet /** - * Store all the systemStreamPartitions registered + * Mapping from the systemStreamPartition to the lowest registered offset. + * When multiple offsets are registered for a system stream partition, lowest offset is chosen. */ - var registeredSystemStreamPartitions = Set[SystemStreamPartition]() + var registeredSystemStreamPartitions = mutable.Map[SystemStreamPartition, String]() /** * The number of lagging partitions that the underlying wrapped chooser has @@ -102,8 +111,16 @@ class BootstrappingChooser( var updatedSystemStreams = Map[SystemStream, Int]() def start = { + for ((systemStreamPartition, offset) <- registeredSystemStreamPartitions) { + // If the offset we're starting to consume from is the same as the upcoming + // offset for this system stream partition, then we've already read all + // messages in the stream, and we're at head for this system stream + // partition. + checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING) + } + // remove the systemStreamPartitions not registered. - laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_)) + laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.keys.contains(_)) systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size} debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata) @@ -120,15 +137,27 @@ class BootstrappingChooser( override def register(systemStreamPartition: SystemStreamPartition, offset: String) { debug("Registering stream partition with offset: %s, %s" format (systemStreamPartition, offset)) - // If the offset we're starting to consume from is the same as the upcoming - // offset for this system stream partition, then we've already read all - // messages in the stream, and we're at head for this system stream - // partition. - checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING) - wrapped.register(systemStreamPartition, offset) - registeredSystemStreamPartitions += systemStreamPartition + val system = systemStreamPartition.getSystem + val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system)) + /** + * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast + * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition. + * When multiple tasks processing a SystemStreamPartition runs within a container + * and share the same chooser, then the lowest offset should be chosen as starting offset. + */ + if (!registeredSystemStreamPartitions.contains(systemStreamPartition)) { + registeredSystemStreamPartitions += systemStreamPartition -> offset + } else if (offset != null) { + val existingOffset = registeredSystemStreamPartitions(systemStreamPartition) + val comparatorResult: Integer = systemAdmin.offsetComparator(existingOffset, offset) + if (comparatorResult == null) { + warn("Existing offset: %s and incoming offset: %s of system stream partition: %s are not comparable." format (existingOffset, offset, systemStreamPartition)) + } else if (comparatorResult > 0) { + registeredSystemStreamPartitions += systemStreamPartition -> offset + } + } } def update(envelope: IncomingMessageEnvelope) { http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala index b433713..c0805c4 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala @@ -22,14 +22,14 @@ package org.apache.samza.system.chooser import org.apache.samza.SamzaException import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava} import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} -import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition} import org.apache.samza.util.Logging import scala.collection.JavaConverters._ object DefaultChooser extends Logging { - def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = { + def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = { val chooserConfig = new DefaultChooserConfig(config) val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None @@ -65,8 +65,8 @@ object DefaultChooser extends Logging { debug("Got bootstrap stream metadata: %s" format bootstrapStreamMetadata) val priorities = if (usePriority) { - // Ordering is important here. Overrides Int.MaxValue default for - // bootstrap streams with explicitly configured values, in cases where + // Ordering is important here. Overrides Int.MaxValue default for + // bootstrap streams with explicitly configured values, in cases where // users have defined a bootstrap stream's priority in config. defaultPrioritizedStreams ++ prioritizedBootstrapStreams ++ prioritizedStreams } else { @@ -87,7 +87,8 @@ object DefaultChooser extends Logging { priorities, prioritizedChoosers, bootstrapStreamMetadata, - registry) + registry, + systemAdmins) } } @@ -244,7 +245,13 @@ class DefaultChooser( /** * Metrics registry to be used when wiring up wrapped choosers. */ - registry: MetricsRegistry = new MetricsRegistryMap) extends MessageChooser with Logging { + registry: MetricsRegistry = new MetricsRegistryMap, + + /** + * Defines a mapping from SystemStream name to SystemAdmin. + * This is useful for determining if a bootstrap SystemStream is caught up. + */ + systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging { val chooser = { val useBatching = batchSize.isDefined @@ -256,7 +263,7 @@ class DefaultChooser( val maybePrioritized = if (usePriority) { new TieredPriorityChooser(prioritizedStreams, prioritizedChoosers, DefaultChooser) } else if (DefaultChooser == null) { - // Null wrapped chooser without a priority chooser is not allowed + // Null wrapped chooser without a priority chooser is not allowed // because DefaultChooser needs an underlying message chooser. throw new SamzaException("A null chooser was given to the DefaultChooser. This is not allowed unless you are using prioritized/bootstrap streams, which you're not.") } else { @@ -270,7 +277,7 @@ class DefaultChooser( } if (useBootstrapping) { - new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry)) + new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry), systemAdmins) } else { maybeBatching } http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index 2e0180d..2a095ce 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -24,6 +24,8 @@ import java.util.Arrays import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition +import org.apache.samza.container.MockSystemAdmin +import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStream import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata @@ -34,7 +36,6 @@ import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters import scala.collection.JavaConversions._ -import scala.collection.immutable.Queue @RunWith(value = classOf[Parameterized]) class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) { @@ -186,7 +187,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val mock = new MockMessageChooser val metadata1 = getMetadata(envelope1, "123") val metadata2 = getMetadata(envelope2, "321") - val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2)) + val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin)) chooser.register(envelope1.getSystemStreamPartition, "1") chooser.register(envelope2.getSystemStreamPartition, "1") @@ -198,6 +199,34 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1) assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts) } + + @Test + def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = { + val mock = new MockMessageChooser + val metadata1 = getMetadata(envelope1, "123") + val metadata2 = getMetadata(envelope2, "321") + val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin)) + + // Envelope1 is registered by multiple tasks, each one of them having different offsets. + chooser.register(envelope1.getSystemStreamPartition, "1") + chooser.register(envelope1.getSystemStreamPartition, "2") + chooser.register(envelope1.getSystemStreamPartition, null) + + // Envelope2 is registered by multiple tasks, each one of them having different offsets. + chooser.register(envelope2.getSystemStreamPartition, "1") + chooser.register(envelope2.getSystemStreamPartition, "2") + chooser.register(envelope2.getSystemStreamPartition, null) + + + chooser.start + + // it should only contain stream partition 0 and stream1 partition 1 + val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition) + assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions) + val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1) + assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts) + + } } object TestBootstrappingChooser { @@ -206,6 +235,6 @@ object TestBootstrappingChooser { // chooser. @Parameters def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList( - Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)), - Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata))) + Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))), + Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = Map("kafka" -> new MockSystemAdmin)))) } http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index 7fb70b2..5a04469 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -21,6 +21,8 @@ package org.apache.samza.system.chooser import org.apache.samza.Partition import org.apache.samza.config.{DefaultChooserConfig, MapConfig} +import org.apache.samza.container.MockSystemAdmin +import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition} import org.apache.samza.util.BlockingEnvelopeMap @@ -66,7 +68,9 @@ class TestDefaultChooser { 1 -> mock2), Map( envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata, - envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata)) + envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata), + new MetricsRegistryMap(), + Map("kafka" -> new MockSystemAdmin())) chooser.register(envelope1.getSystemStreamPartition, null) chooser.register(envelope2.getSystemStreamPartition, null)