Repository: samza
Updated Branches:
  refs/heads/master 373ae1c6d -> a4174309a


SAMZA-1317: Changelog validation error for topics with period in the

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pmahe...@linkedin.com>

Closes #213 from jmakes/samza-1317-migration-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a4174309
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a4174309
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a4174309

Branch: refs/heads/master
Commit: a4174309a495ff1e43c3d4c82cababa54dd71d54
Parents: 373ae1c
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 2 20:42:14 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Fri Jun 2 20:42:14 2017 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaStreamSpec.java      |  7 +++++--
 .../samza/system/kafka/KafkaSystemAdmin.scala    |  4 +++-
 .../system/kafka/TestKafkaSystemAdminJava.java   | 19 +++++++++++++++++++
 3 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index 0477854..c7e82f7 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -116,12 +116,15 @@ public class KafkaStreamSpec extends StreamSpec {
   /**
    * Convenience constructor to create a KafkaStreamSpec with just a 
topicName, systemName, and partitionCount.
    *
+   * @param id              The application-unique logical identifier for the 
stream. It is used to distinguish between
+   *                        streams in a Samza application so it must be 
unique in the context of one deployable unit.
+   *                        It does not need to be globally unique or unique 
with respect to a host.
    * @param topicName       The name of the topic.
    * @param systemName      The name of the System. See {@link 
org.apache.samza.system.SystemFactory}
    * @param partitionCount  The number of partitions.
    */
-  public KafkaStreamSpec(String topicName, String systemName, int 
partitionCount) {
-    this(topicName, topicName, systemName, partitionCount, 
DEFAULT_REPLICATION_FACTOR, new Properties());
+  public KafkaStreamSpec(String id, String topicName, String systemName, int 
partitionCount) {
+    this(id, topicName, systemName, partitionCount, 
DEFAULT_REPLICATION_FACTOR, new Properties());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 2f82754..af77d5b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
 
 import java.util
 import java.util.{Properties, UUID}
+
 import kafka.admin.AdminUtils
 import kafka.api._
 import kafka.common.TopicAndPartition
@@ -31,6 +32,7 @@ import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadat
 import org.apache.samza.system._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, KafkaUtil, Logging}
 import org.apache.samza.{Partition, SamzaException}
+
 import scala.collection.JavaConverters._
 
 
@@ -514,7 +516,7 @@ class KafkaSystemAdmin(
     * will auto-create a new topic.
     */
   override def validateChangelogStream(topicName: String, 
numKafkaChangelogPartitions: Int) = {
-    validateStream(new KafkaStreamSpec(topicName, systemName, 
numKafkaChangelogPartitions))
+    validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, 
systemName, numKafkaChangelogPartitions))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index a47ba9d..ce59b40 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -132,6 +132,25 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
   }
 
   @Test
+  public void 
testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
+    final String STREAM = "test.Change_Log.Validate";
+    Properties coordProps = new Properties();
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
+
+    KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, 
Util.javaMapAsScalaMap(changeLogMap));
+    SystemAdmin admin = Mockito.spy(systemAdmin);
+    StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
+
+    admin.createChangelogStream(spec.getPhysicalName(), 
spec.getPartitionCount());
+    admin.validateStream(spec);
+    admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should 
not throw
+
+    Mockito.verify(admin).createStream(Mockito.any());
+    Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
+  }
+
+  @Test
   public void testCreateStream() {
     SystemAdmin admin = this.basicSystemAdmin;
     StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);

Reply via email to