Repository: samza Updated Branches: refs/heads/master 373ae1c6d -> a4174309a
SAMZA-1317: Changelog validation error for topics with period in the Author: Jacob Maes <jm...@linkedin.com> Reviewers: Prateek Maheshwari <pmahe...@linkedin.com> Closes #213 from jmakes/samza-1317-migration-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a4174309 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a4174309 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a4174309 Branch: refs/heads/master Commit: a4174309a495ff1e43c3d4c82cababa54dd71d54 Parents: 373ae1c Author: Jacob Maes <jm...@linkedin.com> Authored: Fri Jun 2 20:42:14 2017 -0700 Committer: Jacob Maes <jm...@linkedin.com> Committed: Fri Jun 2 20:42:14 2017 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaStreamSpec.java | 7 +++++-- .../samza/system/kafka/KafkaSystemAdmin.scala | 4 +++- .../system/kafka/TestKafkaSystemAdminJava.java | 19 +++++++++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index 0477854..c7e82f7 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -116,12 +116,15 @@ public class KafkaStreamSpec extends StreamSpec { /** * Convenience constructor to create a KafkaStreamSpec with just a topicName, systemName, and partitionCount. * + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + * streams in a Samza application so it must be unique in the context of one deployable unit. + * It does not need to be globally unique or unique with respect to a host. * @param topicName The name of the topic. * @param systemName The name of the System. See {@link org.apache.samza.system.SystemFactory} * @param partitionCount The number of partitions. */ - public KafkaStreamSpec(String topicName, String systemName, int partitionCount) { - this(topicName, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); + public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount) { + this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 2f82754..af77d5b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka import java.util import java.util.{Properties, UUID} + import kafka.admin.AdminUtils import kafka.api._ import kafka.common.TopicAndPartition @@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadat import org.apache.samza.system._ import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging} import org.apache.samza.{Partition, SamzaException} + import scala.collection.JavaConverters._ @@ -514,7 +516,7 @@ class KafkaSystemAdmin( * will auto-create a new topic. */ override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { - validateStream(new KafkaStreamSpec(topicName, systemName, numKafkaChangelogPartitions)) + validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index a47ba9d..ce59b40 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -132,6 +132,25 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { } @Test + public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() { + final String STREAM = "test.Change_Log.Validate"; + Properties coordProps = new Properties(); + Map<String, ChangelogInfo> changeLogMap = new HashMap<>(); + changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties())); + + KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)); + SystemAdmin admin = Mockito.spy(systemAdmin); + StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12); + + admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount()); + admin.validateStream(spec); + admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw + + Mockito.verify(admin).createStream(Mockito.any()); + Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any()); + } + + @Test public void testCreateStream() { SystemAdmin admin = this.basicSystemAdmin; StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);