Repository: incubator-samza Updated Branches: refs/heads/master 40bbe4da5 -> 5aef56f4c
SAMZA-337; compress configs passed through environment variables in YARN Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5aef56f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5aef56f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5aef56f4 Branch: refs/heads/master Commit: 5aef56f4c169be354be1dda2ce6126cb67c83009 Parents: 40bbe4d Author: Chinmay Soman <[email protected]> Authored: Thu Jul 17 12:41:29 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Thu Jul 17 12:41:29 2014 -0700 ---------------------------------------------------------------------- build.gradle | 4 ++ gradle/dependency-versions.gradle | 1 + .../samza/config/ShellCommandConfig.scala | 8 ++++ .../apache/samza/container/SamzaContainer.scala | 30 +++++++++++++-- .../apache/samza/job/ShellCommandBuilder.scala | 21 +++++++++-- .../main/scala/org/apache/samza/util/Util.scala | 39 +++++++++++++++++++- .../scala/org/apache/samza/util/TestUtil.scala | 24 ++++++++++++ 7 files changed, 119 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 6d0ac97..b201f77 100644 --- a/build.gradle +++ b/build.gradle @@ -114,6 +114,10 @@ project(":samza-core_$scalaVersion") { compile "org.clapper:grizzled-slf4j_$scalaVersion:$grizzledVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "org.codehaus.jackson:jackson-jaxrs:$jacksonVersion" + + // Temporary workaround to reduce config size via compression (SAMZA-337). Remove this + // once we figure out a clean way to do this. + compile "commons-codec:commons-codec:$commonsCodecVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 7373582..c8bd830 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -32,4 +32,5 @@ yarnVersion = "2.4.0" slf4jVersion = "1.6.2" guavaVersion = "17.0" + commonsCodecVersion = "1.9" } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index 4c2d365..0cdc0d1 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -41,8 +41,14 @@ object ShellCommandConfig { */ val ENV_JAVA_OPTS = "JAVA_OPTS" + /** + * Specifies whether the config for ENV_CONFIG and ENV_SYSTEM_STREAMS are compressed or not. + */ + val ENV_COMPRESS_CONFIG = "SAMZA_COMPRESS_CONFIG" + val COMMAND_SHELL_EXECUTE = "task.execute" val TASK_JVM_OPTS = "task.opts" + val COMPRESS_ENV_CONFIG = "task.config.compress" implicit def Config2ShellCommand(config: Config) = new ShellCommandConfig(config) } @@ -51,4 +57,6 @@ class ShellCommandConfig(config: Config) extends ScalaMapConfig(config) { def getCommand = getOption(ShellCommandConfig.COMMAND_SHELL_EXECUTE).getOrElse("bin/run-container.sh") def getTaskOpts = getOption(ShellCommandConfig.TASK_JVM_OPTS) + + def isEnvConfigCompressed = getBoolean(ShellCommandConfig.COMPRESS_ENV_CONFIG, false) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index b303615..bff6000 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -65,13 +65,37 @@ import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.system.StreamMetadataCache object SamzaContainer extends Logging { + + /** + * Get the decompressed parameter value for the given parameter (if necessary) + * @param param The parameter which is to be decompressed (if necessary) + * @return A valid parameter value + */ + def getParameter(param: String, isCompressed: Boolean) : String = { + if (isCompressed) { + info("Compression is ON !") + val decomressedParam = Util.decompress(param) + info("Got param = " + decomressedParam) + decomressedParam + } else { + info("Parameter is uncompressed. Using it as-is") + param + } + } + def main(args: Array[String]) { val jmxServer = new JmxServer val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME) - val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG) - val config = JsonConfigSerializer.fromJson(configStr) - val encodedStreamsAndPartitions = System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS) + /** + * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS' + * properties. Note: This is a temporary workaround to reduce the size of the config and hence size + * of the environment variable(s) exported while starting a Samza container (SAMZA-337) + */ + val isCompressed = if (System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")) true else false + val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed) + val config = JsonConfigSerializer.fromJson(configStr) + val encodedStreamsAndPartitions = getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed) val partitions = Util.deserializeSSPSetFromJSON(encodedStreamsAndPartitions) if (partitions.isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala index f8865b1..4635bb2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala @@ -29,12 +29,25 @@ class ShellCommandBuilder extends CommandBuilder { def buildCommand() = config.getCommand def buildEnvironment(): java.util.Map[String, String] = { - val streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion - + var streamsAndPartsString = Util.serializeSSPSetToJSON(systemStreamPartitions.toSet) // Java to Scala set conversion + var envConfig = JsonConfigSerializer.toJson(config) + val isCompressed = if(config.isEnvConfigCompressed) "TRUE" else "FALSE" + + if (config.isEnvConfigCompressed) { + /** + * If the compressed option is enabled in config, compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS' + * properties. Note: This is a temporary workaround to reduce the size of the config and hence size + * of the environment variable(s) exported while starting a Samza container (SAMZA-337) + */ + streamsAndPartsString = Util.compress(streamsAndPartsString) + envConfig = Util.compress(envConfig) + } + Map( ShellCommandConfig.ENV_CONTAINER_NAME -> name, ShellCommandConfig.ENV_SYSTEM_STREAMS -> streamsAndPartsString, - ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config), - ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse("")) + ShellCommandConfig.ENV_CONFIG -> envConfig, + ShellCommandConfig.ENV_JAVA_OPTS -> config.getTaskOpts.getOrElse(""), + ShellCommandConfig.ENV_COMPRESS_CONFIG -> isCompressed) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 11c23d0..60d96c9 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -19,10 +19,12 @@ package org.apache.samza.util -import java.io.File +import java.io._ import java.lang.management.ManagementFactory import java.util.Random +import java.util.zip.{GZIPInputStream, GZIPOutputStream} import grizzled.slf4j.Logging +import org.apache.commons.codec.binary.Base64 import org.apache.samza.{ Partition, SamzaException } import org.apache.samza.config.Config import org.apache.samza.config.SystemConfig.Config2System @@ -180,4 +182,39 @@ object Util extends Logging { def getContainerPID(): String = { ManagementFactory.getRuntimeMXBean().getName() } + + /** + * Returns a compressed + base64 encoded representation of the given string + * @param origStr Original string to be processed + * @return A processed string after compression (gzip) and encoding (base64) + */ + def compress(origStr: String): String = { + val bos = new ByteArrayOutputStream() + val gzos = new GZIPOutputStream(bos) + gzos.write(origStr.getBytes()) + gzos.close() + val compressedStr = Base64.encodeBase64String(bos.toByteArray) + bos.close() + compressedStr + } + + /** + * Returns the original string from the given compressed string. This function assumes + * that the given string is compressed + base64 encode (using the above compress function). + * + * @param compresedStr Specified string to decompress + * @return Original string after decoding (base64) and uncompressing (gzip) + */ + def decompress(compresedStr: String) : String = { + val rawBytes = Base64.decodeBase64(compresedStr) + val gzis = new GZIPInputStream(new ByteArrayInputStream(rawBytes)) + val br = new BufferedReader(new InputStreamReader(gzis)) + val builder = new StringBuilder + var line = br.readLine() + while (line != null) { + builder.append(line) + line = br.readLine() + } + builder.toString() + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5aef56f4/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index a67ecdf..5ed167d 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -29,7 +29,11 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.Config +import scala.util.Random + class TestUtil { + val random = new Random(System.currentTimeMillis()) + @Test def testGetInputStreamPartitions { val expectedPartitionsPerStream = 1 @@ -94,6 +98,26 @@ class TestUtil { val backToStreamsAndParts = deserializeSSPSetFromJSON(asString) assertEquals(streamsAndParts, backToStreamsAndParts) } + + /** + * Generate a random alphanumeric string of the specified length + * @param length Specifies length of the string to generate + * @return An alphanumeric string + */ + def generateString (length : Int) : String = { + random.alphanumeric.take(length).mkString + } + + @Test + def testCompressAndDecompressUtility() { + var len : Integer = 0 + (10 until 1000).foreach(len => { + val sample = generateString(len) + val compressedStr = Util.compress(sample) + val deCompressedStr = Util.decompress(compressedStr) + assertEquals(sample, deCompressedStr) + }) + } } /**
