SAMZA-633; expose configuration to Kafka's encoder and decoder
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76e151c1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76e151c1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76e151c1 Branch: refs/heads/samza-sql Commit: 76e151c14a5d01b88c49edfb2c847ffe3c0e6242 Parents: cde495e Author: Dan Harvey <[email protected]> Authored: Thu Apr 2 16:07:56 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Apr 2 16:07:56 2015 -0700 ---------------------------------------------------------------------- .../apache/samza/config/KafkaSerdeConfig.scala | 12 ++++++++++++ .../apache/samza/serializers/KafkaSerde.scala | 20 ++++++++++++++++++-- .../samza/config/TestKafkaSerdeConfig.scala | 7 +++++++ 3 files changed, 37 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala index 11078e3..ed2bd40 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaSerdeConfig.scala @@ -19,6 +19,10 @@ package org.apache.samza.config +import java.util.Properties + +import kafka.utils.VerifiableProperties + object KafkaSerdeConfig { // kafka serde config constants val ENCODER = SerializerConfig.SERIALIZER_PREFIX + ".encoder" @@ -33,4 +37,12 @@ class KafkaSerdeConfig(config: Config) extends ScalaMapConfig(config) { def getKafkaDecoder(serializer: String) = getOption(KafkaSerdeConfig.DECODER format serializer) + + def getKafkaProperties(serializer: String) = { + val properties = new Properties(); + val prefix = SerializerConfig.SERIALIZER_PREFIX format serializer + "." + properties.putAll(config.subset(prefix, true)) + + new VerifiableProperties(properties); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala index 82ba2a0..d626b79 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/serializers/KafkaSerde.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import org.apache.samza.util.Util import kafka.serializer.Encoder import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties import org.apache.samza.config.Config import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde import org.apache.samza.SamzaException @@ -39,8 +40,23 @@ class KafkaSerdeFactory[T] extends SerdeFactory[T] { val decoderClassName = config .getKafkaDecoder(name) .getOrElse(throw new SamzaException("No kafka decoder defined for %s" format name)) - val encoder = Util.getObj[Encoder[T]](encoderClassName) - val decoder = Util.getObj[Decoder[T]](decoderClassName) + + val verifiableProperties = config.getKafkaProperties(name) + + val encoder = getObj[Encoder[T]](encoderClassName, verifiableProperties); + val decoder = getObj[Decoder[T]](decoderClassName, verifiableProperties); + new KafkaSerde(encoder, decoder) } + + /** + * Instantiate a class instance from a given className and properties. + */ + private def getObj[T](className: String, properties: VerifiableProperties) = { + Class + .forName(className) + .getDeclaredConstructor(classOf[VerifiableProperties]) + .newInstance(properties) + .asInstanceOf[T] + } } http://git-wip-us.apache.org/repos/asf/samza/blob/76e151c1/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala index 5cf82c2..d6899b8 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala @@ -38,4 +38,11 @@ class TestKafkaSerdeConfig { assertEquals(MAGIC_VAL, config.getKafkaEncoder("test").getOrElse("")) assertEquals(MAGIC_VAL, config.getKafkaDecoder("test").getOrElse("")) } + + @Test + def testKafkaConfigurationProperties { + val properties = config.getKafkaProperties("test") + assertEquals(MAGIC_VAL, properties.getString("encoder")) + assertEquals(MAGIC_VAL, properties.getString("decoder")) + } }
