SAMZA-633; expose configuration to Kafka's encoder and decoder

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76e151c1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76e151c1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76e151c1

Branch: refs/heads/samza-sql
Commit: 76e151c14a5d01b88c49edfb2c847ffe3c0e6242
Parents: cde495e
Author: Dan Harvey <[email protected]>
Authored: Thu Apr 2 16:07:56 2015 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Thu Apr 2 16:07:56 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/config/KafkaSerdeConfig.scala  | 12 ++++++++++++
 .../apache/samza/serializers/KafkaSerde.scala   | 20 ++++++++++++++++++--
 .../samza/config/TestKafkaSerdeConfig.scala     |  7 +++++++
 3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
index 11078e3..ed2bd40 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala
@@ -19,6 +19,10 @@
 
 package org.apache.samza.config
 
+import java.util.Properties
+
+import kafka.utils.VerifiableProperties
+
 object KafkaSerdeConfig {
   // kafka serde config constants
   val ENCODER = SerializerConfig.SERIALIZER_PREFIX + ".encoder"
@@ -33,4 +37,12 @@ class KafkaSerdeConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   def getKafkaDecoder(serializer: String) =
     getOption(KafkaSerdeConfig.DECODER format serializer)
+
+  def getKafkaProperties(serializer: String) = {
+    val properties = new Properties();
+    val prefix = SerializerConfig.SERIALIZER_PREFIX format serializer + "."
+    properties.putAll(config.subset(prefix, true))
+
+    new VerifiableProperties(properties);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala 
b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
index 82ba2a0..d626b79 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import org.apache.samza.util.Util
 import kafka.serializer.Encoder
 import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
 import org.apache.samza.config.Config
 import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 import org.apache.samza.SamzaException
@@ -39,8 +40,23 @@ class KafkaSerdeFactory[T] extends SerdeFactory[T] {
     val decoderClassName = config
       .getKafkaDecoder(name)
       .getOrElse(throw new SamzaException("No kafka decoder defined for %s" 
format name))
-    val encoder = Util.getObj[Encoder[T]](encoderClassName)
-    val decoder = Util.getObj[Decoder[T]](decoderClassName)
+
+    val verifiableProperties = config.getKafkaProperties(name)
+
+    val encoder = getObj[Encoder[T]](encoderClassName, verifiableProperties);
+    val decoder = getObj[Decoder[T]](decoderClassName, verifiableProperties);
+
     new KafkaSerde(encoder, decoder)
   }
+
+  /**
+   * Instantiate a class instance from a given className and properties.
+   */
+  private def getObj[T](className: String, properties: VerifiableProperties) = 
{
+    Class
+      .forName(className)
+      .getDeclaredConstructor(classOf[VerifiableProperties])
+      .newInstance(properties)
+      .asInstanceOf[T]
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
index 5cf82c2..d6899b8 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
@@ -38,4 +38,11 @@ class TestKafkaSerdeConfig {
     assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse(""))
     assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse(""))
   }
+
+  @Test
+  def testKafkaConfigurationProperties {
+    val properties = config.getKafkaProperties("test")
+    assertEquals(MAGIC_VAL, properties.getString("encoder"))
+    assertEquals(MAGIC_VAL, properties.getString("decoder"))
+  }
 }

Reply via email to