Repository: samza
Updated Branches:
  refs/heads/0.13.0 48b05c7d3 -> c74e03ca8


SAMZA-1317: Changelog topic configuration should accept streams.[streamId] 
configurations

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pmahe...@linkedin.com>

Closes #212 from jmakes/samza-1317-migration-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c66c4877
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c66c4877
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c66c4877

Branch: refs/heads/0.13.0
Commit: c66c48770aacb65e3bf887e31c6efffb498cc9cf
Parents: 48b05c7
Author: Jacob Maes <jm...@linkedin.com>
Authored: Fri Jun 2 14:53:31 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Sun Jun 4 17:20:28 2017 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  8 +++-
 .../system/kafka/TestKafkaSystemAdminJava.java  | 46 ++++++++++++++++----
 2 files changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c66c4877/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 8c90c6c..2f82754 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -35,6 +35,10 @@ import scala.collection.JavaConverters._
 
 
 object KafkaSystemAdmin extends Logging {
+  // Use a dummy string for the stream id. The physical name and partition 
count are all that matter for changelog creation, so the dummy string should 
not be used.
+  // We cannot use the topic name, as it may include special chars which are 
not allowed in stream IDs. See SAMZA-1317
+  val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id"
+
   /**
    * A helper method that takes oldest, newest, and upcoming offsets for each
    * system stream partition, and creates a single map from stream name to
@@ -490,10 +494,10 @@ class KafkaSystemAdmin(
   class KafkaChangelogException(s: String, t: Throwable) extends 
SamzaException(s, t) {
     def this(s: String) = this(s, null)
   }
-
+  
   override def createChangelogStream(topicName: String, 
numKafkaChangelogPartitions: Int) = {
     val topicMeta = topicMetaInformation.getOrElse(topicName, throw new 
KafkaChangelogException("Unable to find topic information for topic " + 
topicName))
-    val spec = new KafkaStreamSpec(topicName, topicName, systemName, 
numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
+    val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, 
numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps)
 
     if (createStream(spec)) {
       info("Created changelog stream %s." format topicName)

http://git-wip-us.apache.org/repos/asf/samza/blob/c66c4877/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index f5bc73a..a47ba9d 100644
--- 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.system.kafka;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
@@ -27,13 +30,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 
 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@@ -67,7 +64,38 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
     SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, 
Util.javaMapAsScalaMap(changeLogMap)));
-    StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS);
+    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), 
STREAM, SYSTEM(), PARTITIONS);
+    admin.createChangelogStream(STREAM, PARTITIONS);
+    admin.validateStream(spec);
+
+    ArgumentCaptor<StreamSpec> specCaptor = 
ArgumentCaptor.forClass(StreamSpec.class);
+    Mockito.verify(admin).createStream(specCaptor.capture());
+
+    StreamSpec internalSpec = specCaptor.getValue();
+    assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec 
is used to carry replication factor
+    assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
+    assertEquals(SYSTEM(), internalSpec.getSystemName());
+    assertEquals(STREAM, internalSpec.getPhysicalName());
+    assertEquals(REP_FACTOR, ((KafkaStreamSpec) 
internalSpec).getReplicationFactor());
+    assertEquals(PARTITIONS, internalSpec.getPartitionCount());
+    assertEquals(changeLogProps, ((KafkaStreamSpec) 
internalSpec).getProperties());
+  }
+
+  @Test
+  public void 
testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
+    final String STREAM = "test.Change_Log.Stream";
+    final int PARTITIONS = 12;
+    final int REP_FACTOR = 3;
+
+    Properties coordProps = new Properties();
+    Properties changeLogProps = new Properties();
+    changeLogProps.setProperty("cleanup.policy", "compact");
+    changeLogProps.setProperty("segment.bytes", "139");
+    Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
+    changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
+
+    SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, 
Util.javaMapAsScalaMap(changeLogMap)));
+    StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), 
STREAM, SYSTEM(), PARTITIONS);
     admin.createChangelogStream(STREAM, PARTITIONS);
     admin.validateStream(spec);
 
@@ -76,7 +104,7 @@ public class TestKafkaSystemAdminJava extends 
TestKafkaSystemAdmin {
 
     StreamSpec internalSpec = specCaptor.getValue();
     assertTrue(internalSpec instanceof KafkaStreamSpec);  // KafkaStreamSpec 
is used to carry replication factor
-    assertEquals(STREAM, internalSpec.getId());
+    assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
     assertEquals(SYSTEM(), internalSpec.getSystemName());
     assertEquals(STREAM, internalSpec.getPhysicalName());
     assertEquals(REP_FACTOR, ((KafkaStreamSpec) 
internalSpec).getReplicationFactor());

Reply via email to