Repository: samza Updated Branches: refs/heads/master d7fc811d6 -> 9db47b861
http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 7e9f18a..ae6330f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -25,7 +25,7 @@ import java.util.regex.Pattern import org.apache.samza.util.Util import org.apache.samza.util.Logging -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import kafka.consumer.ConsumerConfig import java.util.{Properties, UUID} @@ -102,6 +102,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getFetchMessageMaxBytesTopics(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) subConf + .asScala .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes")) .map { case (fetchMessageMaxBytes, fetchSizeValue) => @@ -116,6 +117,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getAutoOffsetResetTopics(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) subConf + .asScala .filterKeys(k => k.endsWith(".consumer.auto.offset.reset")) .map { case (topicAutoOffsetReset, resetValue) => @@ -162,7 +164,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) - filteredConfigs.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } + filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) } kafkaChangeLogProperties } @@ -177,7 +179,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { consumerProps.putAll(subConf) consumerProps.put("group.id", groupId) consumerProps.put("client.id", clientId) - consumerProps.putAll(injectedProps) + consumerProps.putAll(injectedProps.asJava) new ConsumerConfig(consumerProps) } @@ -189,7 +191,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val producerProps = new util.HashMap[String, Object]() producerProps.putAll(subConf) producerProps.put("client.id", clientId) - producerProps.putAll(injectedProps) + producerProps.putAll(injectedProps.asJava) new KafkaProducerConfig(systemName, clientId, producerProps) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala index 4e3b247..6dc2f82 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala @@ -24,12 +24,11 @@ import kafka.utils.ZkUtils import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS } import org.apache.samza.SamzaException import org.apache.samza.util.Util -import collection.JavaConversions._ +import collection.JavaConverters._ import org.apache.samza.util.Logging import scala.collection._ import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.system.SystemStream -import scala.util.Sorting /** * Dynamically determine the Kafka topics to use as input streams to the task via a regular expression. @@ -80,6 +79,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging { // For each topic that matched, generate all the specified configs config .getRegexResolvedInheritedConfig(rewriterName) + .asScala .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + ".streams." + m.getStream + "." + kv._1, kv._2)) } // Build new inputs @@ -92,7 +92,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging { .sortWith(_ < _) .mkString(",") - new MapConfig((keysAndValsToAdd ++ config) += inputStreams) + new MapConfig(((keysAndValsToAdd ++ config.asScala) += inputStreams).asJava) } def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 539a439..5338886 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -23,7 +23,6 @@ package org.apache.samza.system.kafka import java.lang.Thread.UncaughtExceptionHandler import java.nio.channels.ClosedByInterruptException -import java.util.Map.Entry import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import kafka.api._ @@ -35,9 +34,8 @@ import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.concurrent -import scala.collection.mutable import org.apache.samza.util.KafkaUtil /** @@ -71,7 +69,7 @@ class BrokerProxy( val sleepMSWhileNoTopicPartitions = 100 /** What's the next offset for a particular partition? **/ - val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() + val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala /** Block on the first call to get message if the fetcher has not yet returned its initial results **/ // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but @@ -95,7 +93,7 @@ class BrokerProxy( def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { debug("Adding new topic and partition %s to queue for %s" format (tp, host)) - if (nextOffsets.containsKey(tp)) { + if (nextOffsets.asJava.containsKey(tp)) { toss("Already consuming TopicPartition %s" format tp) } @@ -113,13 +111,13 @@ class BrokerProxy( nextOffsets += tp -> offset - metrics.topicPartitions(host, port).set(nextOffsets.size) + metrics.topicPartitions.get((host, port)).set(nextOffsets.size) } def removeTopicPartition(tp: TopicAndPartition) = { - if (nextOffsets.containsKey(tp)) { + if (nextOffsets.asJava.containsKey(tp)) { val offset = nextOffsets.remove(tp) - metrics.topicPartitions(host, port).set(nextOffsets.size) + metrics.topicPartitions.get((host, port)).set(nextOffsets.size) debug("Removed %s" format tp) offset } else { @@ -136,7 +134,7 @@ class BrokerProxy( (new ExponentialSleepStrategy).run( loop => { if (reconnect) { - metrics.reconnects(host, port).inc + metrics.reconnects.get((host, port)).inc simpleConsumer.close() simpleConsumer = createSimpleConsumer() } @@ -178,23 +176,23 @@ class BrokerProxy( val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList if (topicAndPartitionsToFetch.size > 0) { - metrics.brokerReads(host, port).inc + metrics.brokerReads.get((host, port)).inc val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*) firstCall = false firstCallBarrier.countDown() // Split response into errors and non errors, processing the errors first - val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError) + val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError) handleErrors(errorResponses, response) - nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) } + nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) } } else { refreshLatencyMetrics debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) - metrics.brokerSkippedFetchRequests(host, port).inc + metrics.brokerSkippedFetchRequests.get((host, port)).inc Thread.sleep(sleepMSWhileNoTopicPartitions) } @@ -221,7 +219,7 @@ class BrokerProxy( immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) } - def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = { + def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = { // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves case class Error(tp: TopicAndPartition, code: Short, exception: Throwable) @@ -229,10 +227,10 @@ class BrokerProxy( // Convert FetchResponse into easier-to-work-with Errors val errors = for ( - error <- errorResponses; - errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values... + (topicAndPartition, responseData) <- errorResponses; + errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values... exception <- Option(ErrorMapping.exceptionFor(errorCode)) - ) yield new Error(error.getKey, errorCode, exception) + ) yield new Error(topicAndPartition, errorCode, exception) val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode } val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode) @@ -274,10 +272,10 @@ class BrokerProxy( nextOffset = message.nextOffset val bytesSize = message.message.payloadSize + message.message.keySize - metrics.reads(tp).inc - metrics.bytesRead(tp).inc(bytesSize) - metrics.brokerBytesRead(host, port).inc(bytesSize) - metrics.offsets(tp).set(nextOffset) + metrics.reads.get(tp).inc + metrics.bytesRead.get(tp).inc(bytesSize) + metrics.brokerBytesRead.get((host, port)).inc(bytesSize) + metrics.offsets.get(tp).set(nextOffset) } nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching. @@ -285,8 +283,8 @@ class BrokerProxy( // Update high water mark val hw = data.hw if (hw >= 0) { - metrics.highWatermark(tp).set(hw) - metrics.lag(tp).set(hw - nextOffset) + metrics.highWatermark.get(tp).set(hw) + metrics.lag.get(tp).set(hw - nextOffset) } else { debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) } @@ -327,10 +325,10 @@ class BrokerProxy( if (latestOffset >= 0) { // only update the registered topicAndpartitions if(metrics.highWatermark.containsKey(topicAndPartition)) { - metrics.highWatermark(topicAndPartition).set(latestOffset) + metrics.highWatermark.get(topicAndPartition).set(latestOffset) } if(metrics.lag.containsKey(topicAndPartition)) { - metrics.lag(topicAndPartition).set(latestOffset - offset) + metrics.lag.get(topicAndPartition).set(latestOffset - offset) } } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 309b653..8c90c6c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -21,20 +21,17 @@ package org.apache.samza.system.kafka import java.util import java.util.{Properties, UUID} - import kafka.admin.AdminUtils import kafka.api._ -import kafka.common.{TopicAndPartition, TopicExistsException} +import kafka.common.TopicAndPartition import kafka.consumer.{ConsumerConfig, SimpleConsumer} import kafka.utils.ZkUtils -import org.apache.samza.config.KafkaConfig +import org.apache.kafka.common.errors.TopicExistsException import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system._ import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging} import org.apache.samza.{Partition, SamzaException} - -import scala.collection.JavaConversions -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object KafkaSystemAdmin extends Logging { @@ -59,7 +56,7 @@ object KafkaSystemAdmin extends Logging { (systemStreamPartition.getPartition, partitionMetadata) }) .toMap - val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata) + val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava) (streamName, streamMetadata) } .toMap @@ -151,7 +148,7 @@ class KafkaSystemAdmin( retryBackoff.run( loop => { val metadata = TopicMetadataCache.getTopicMetadata( - streams.toSet, + streams.asScala.toSet, systemName, getTopicMetadata, metadataTTL) @@ -162,11 +159,11 @@ class KafkaSystemAdmin( pm => new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "") }.toMap[Partition, SystemStreamPartitionMetadata] - (topic -> new SystemStreamMetadata(topic, partitionsMap)) + (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava)) } } loop.done - JavaConversions.mapAsJavaMap(result) + result.asJava }, (exception, loop) => { @@ -188,11 +185,11 @@ class KafkaSystemAdmin( // This is safe to do with Kafka, even if a topic is key-deduped. If the // offset doesn't exist on a compacted topic, Kafka will return the first // message AFTER the offset that was specified in the fetch request. - offsets.mapValues(offset => (offset.toLong + 1).toString) + offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava } override def getSystemStreamMetadata(streams: java.util.Set[String]) = - getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)) + getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava /** * Given a set of stream names (topics), fetch metadata from Kafka for each @@ -207,7 +204,7 @@ class KafkaSystemAdmin( retryBackoff.run( loop => { val metadata = TopicMetadataCache.getTopicMetadata( - streams.toSet, + streams.asScala.toSet, systemName, getTopicMetadata, metadataTTL) @@ -241,12 +238,7 @@ class KafkaSystemAdmin( debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition) newestOffsets -= topicAndPartition debug("Setting oldest offset to 0 to consume from beginning") - oldestOffsets.get(topicAndPartition) match { - case Some(s) => - oldestOffsets.updated(topicAndPartition, "0") - case None => - oldestOffsets.put(topicAndPartition, "0") - } + oldestOffsets += (topicAndPartition -> "0") } } } finally { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index fa685ee..f25bb68 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -26,7 +26,6 @@ import kafka.message.MessageAndOffset import org.apache.samza.Partition import org.apache.kafka.common.utils.Utils import org.apache.samza.util.Clock -import java.util.UUID import kafka.serializer.DefaultDecoder import kafka.serializer.Decoder import org.apache.samza.util.BlockingEnvelopeMap @@ -37,7 +36,7 @@ import org.apache.samza.util.TopicMetadataStore import kafka.api.TopicMetadata import org.apache.samza.util.ExponentialSleepStrategy import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.samza.system.SystemAdmin object KafkaSystemConsumer { @@ -133,7 +132,7 @@ private[kafka] class KafkaSystemConsumer( type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() - val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]() + val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala var perPartitionFetchThreshold = fetchThreshold var perPartitionFetchThresholdBytes = 0L http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 6efd2dc..b680ed4 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -36,7 +36,7 @@ import org.apache.samza.util.KafkaUtil import org.apache.samza.util.Logging import org.apache.samza.util.TimerUtils -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ class KafkaSystemProducer(systemName: String, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, @@ -85,7 +85,7 @@ class KafkaSystemProducer(systemName: String, } currentProducer.close - sources.foreach {p => + sources.asScala.foreach {p => if (p._2.exceptionInCallback.get() == null) { flush(p._1) } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 0f0bc22..41d380b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -23,13 +23,13 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicLong import kafka.admin.AdminUtils import kafka.utils.ZkUtils -import org.apache.kafka.clients.producer.{Producer, ProducerRecord} import org.apache.kafka.common.PartitionInfo import org.apache.samza.config.Config import org.apache.samza.config.ConfigException import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.system.OutgoingMessageEnvelope -import kafka.common.{TopicExistsException, ErrorMapping, ReplicaNotAvailableException} +import kafka.common.{ErrorMapping, ReplicaNotAvailableException} +import org.apache.kafka.common.errors.TopicExistsException import org.apache.samza.system.kafka.TopicMetadataCache object KafkaUtil extends Logging { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 1f2f62f..a14812e 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -39,7 +39,7 @@ import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ import org.junit._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection._ class TestKafkaCheckpointManager extends KafkaServerTestHarness { @@ -59,8 +59,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val partition = new Partition(0) val partition2 = new Partition(1) - val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123")) - val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345")) + val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123").asJava) + val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345").asJava) var producerConfig: KafkaProducerConfig = null @@ -82,7 +82,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { config.put("acks", "all") config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString) - config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) + config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava) producerConfig = new KafkaProducerConfig("kafka", "i001", config) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") @@ -234,7 +234,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, - checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) + checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava))) // CheckpointManager with a specific checkpoint topic private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic) @@ -254,7 +254,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, serde = new InvalideSerde(exception), - checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) + checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava))) class InvalideSerde(exception: String) extends CheckpointSerde { override def fromBytes(bytes: Array[Byte]): Checkpoint = { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index d626f1c..555ab9f 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -19,18 +19,14 @@ package org.apache.samza.config -import java.net.URI -import java.io.File import java.util.Properties -import kafka.consumer.ConsumerConfig import org.apache.samza.config.factories.PropertiesConfigFactory import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.clients.producer.ProducerConfig import org.junit.Before -import org.junit.BeforeClass class TestKafkaConfig { @@ -52,7 +48,7 @@ class TestKafkaConfig { val factory = new PropertiesConfigFactory() props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory") - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1") @@ -85,33 +81,33 @@ class TestKafkaConfig { @Test def testStreamLevelFetchSizeOverride() { - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID) // default fetch size assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes) props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144") - val mapConfig1 = new MapConfig(props.toMap[String, String]) + val mapConfig1 = new MapConfig(props.asScala.asJava) val kafkaConfig1 = new KafkaConfig(mapConfig1) val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID) // shared fetch size assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes) props.setProperty("systems." + SYSTEM_NAME + ".streams.topic1.consumer.fetch.message.max.bytes", "65536") - val mapConfig2 = new MapConfig(props.toMap[String, String]) + val mapConfig2 = new MapConfig(props.asScala.asJava) val kafkaConfig2 = new KafkaConfig(mapConfig2) val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME) // topic fetch size assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024)) // default samza.fetch.threshold.bytes - val mapConfig3 = new MapConfig(props.toMap[String, String]) + val mapConfig3 = new MapConfig(props.asScala.asJava) val kafkaConfig3 = new KafkaConfig(mapConfig3) assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty) props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536") - val mapConfig4 = new MapConfig(props.toMap[String, String]) + val mapConfig4 = new MapConfig(props.asScala.asJava) val kafkaConfig4 = new KafkaConfig(mapConfig4) assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get) } @@ -125,7 +121,7 @@ class TestKafkaConfig { props.setProperty("stores.test3.changelog", "otherstream") props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete") - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact") @@ -138,7 +134,7 @@ class TestKafkaConfig { @Test def testDefaultValuesForProducerProperties() { - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties @@ -155,7 +151,7 @@ class TestKafkaConfig { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue); - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties @@ -169,7 +165,7 @@ class TestKafkaConfig { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue); - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties @@ -181,7 +177,7 @@ class TestKafkaConfig { def testMaxInFlightRequestsPerConnectionWrongNumberFormat() { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza"); - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) kafkaProducerConfig.getProducerProperties @@ -191,7 +187,7 @@ class TestKafkaConfig { def testRetriesWrongNumberFormat() { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza"); - val mapConfig = new MapConfig(props.toMap[String, String]) + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) kafkaProducerConfig.getProducerProperties http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala index d6899b8..3871560 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala @@ -23,7 +23,7 @@ import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ class TestKafkaSerdeConfig { val MAGIC_VAL = "1000" @@ -31,7 +31,7 @@ class TestKafkaSerdeConfig { val paramsToTest = List( "serializers.registry.test.encoder", "serializers.registry.test.decoder") - val config = new MapConfig(mapAsJavaMap(paramsToTest.map { m => (m, MAGIC_VAL) }.toMap)) + val config = new MapConfig(paramsToTest.map { m => (m, MAGIC_VAL) }.toMap.asJava) @Test def testKafkaConfigurationIsBackwardsCompatible { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala index 89ced34..69d7da6 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala @@ -19,9 +19,8 @@ package org.apache.samza.config -import collection.JavaConversions._ +import collection.JavaConverters._ -import org.apache.samza.SamzaException import org.junit.Assert._ import org.junit.Test @@ -45,7 +44,7 @@ class TestRegExTopicGenerator { getRegexConfigInherited + ".b.triumph" -> "spitfire", unrelated) - val config = new MapConfig(map) + val config = new MapConfig(map.asJava) // Don't actually talk to ZooKeeper val rewriter = new RegExTopicGenerator() { @@ -83,7 +82,7 @@ class TestRegExTopicGenerator { override def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = List("yoyoyo") } - val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map)) + val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava)) assertEquals("test.yoyoyo", config.get(TaskConfig.INPUT_STREAMS)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index cc7077c..f0bdafd 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.{Matchers, Mockito} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ class TestBrokerProxy extends Logging { val tp2 = new TopicAndPartition("Redbird", 2013) @@ -52,8 +52,8 @@ class TestBrokerProxy extends Logging { bp.addTopicPartition(tp2, Option("0")) Thread.sleep(1000) assertEquals(2, sink.receivedMessages.size) - assertEquals(42, sink.receivedMessages.get(0)._2.offset) - assertEquals(84, sink.receivedMessages.get(1)._2.offset) + assertEquals(42, sink.receivedMessages(0)._2.offset) + assertEquals(84, sink.receivedMessages(1)._2.offset) } @Test def brokerProxySkipsFetchForEmptyRequests() = { @@ -64,8 +64,8 @@ class TestBrokerProxy extends Logging { bp.addTopicPartition(tp2, Option("0")) Thread.sleep(1000) assertEquals(0, sink.receivedMessages.size) - assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0) - assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount) + assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0) + assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount) } @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { @@ -91,7 +91,7 @@ class TestBrokerProxy extends Logging { def refreshDropped() {} def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { - receivedMessages.add((tp, msg, msg.offset.equals(highWatermark))) + receivedMessages += ((tp, msg, msg.offset.equals(highWatermark))) } def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { @@ -109,7 +109,7 @@ class TestBrokerProxy extends Logging { metrics.registerBrokerProxy(host, port) metrics.registerTopicAndPartition(tp) - metrics.topicPartitions(host, port).set(1) + metrics.topicPartitions.get((host, port)).set(1) val bp = new BrokerProxy( host, @@ -168,7 +168,7 @@ class TestBrokerProxy extends Logging { val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet) val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData) - when(fetchResponse.data).thenReturn(map) + when(fetchResponse.data).thenReturn(map.toSeq) when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) fetchResponse } @@ -210,14 +210,14 @@ class TestBrokerProxy extends Logging { bp.addTopicPartition(tp, Option("0")) Thread.sleep(1000) // update when fetching messages - assertEquals(500, bp.metrics.highWatermark(tp).getValue) - assertEquals(415, bp.metrics.lag(tp).getValue) + assertEquals(500, bp.metrics.highWatermark.get(tp).getValue) + assertEquals(415, bp.metrics.lag.get(tp).getValue) fetchTp1 = false Thread.sleep(1000) // update when not fetching messages - assertEquals(100, bp.metrics.highWatermark(tp).getValue) - assertEquals(15, bp.metrics.lag(tp).getValue) + assertEquals(100, bp.metrics.highWatermark.get(tp).getValue) + assertEquals(15, bp.metrics.lag.get(tp).getValue) fetchTp1 = true } @@ -264,7 +264,7 @@ class TestBrokerProxy extends Logging { val response = mock(classOf[FetchResponsePartitionData]) when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode) val responseMap = Map(tp -> response) - when(mfr.data).thenReturn(responseMap) + when(mfr.data).thenReturn(responseMap.toSeq) invocationCount += 1 mfr } else { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index be7db97..19f3903 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -39,7 +39,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStra import org.junit.Assert._ import org.junit._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava @@ -203,11 +203,11 @@ class TestKafkaSystemAdmin { validateTopic(TOPIC, 50) // Verify the empty topic behaves as expected. - var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC)) + var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) assertEquals(1, metadata.size) - assertNotNull(metadata(TOPIC)) + assertNotNull(metadata.get(TOPIC)) // Verify partition count. - var sspMetadata = metadata(TOPIC).getSystemStreamPartitionMetadata + var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata assertEquals(50, sspMetadata.size) // Empty topics should have null for latest offset and 0 for earliest offset assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset) @@ -218,11 +218,11 @@ class TestKafkaSystemAdmin { // Add a new message to one of the partitions, and verify that it works as // expected. producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get() - metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC)) + metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) assertEquals(1, metadata.size) - val streamName = metadata.keySet.head + val streamName = metadata.keySet.asScala.head assertEquals(TOPIC, streamName) - sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata + sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata // key1 gets hash-mod'd to partition 48. assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset) assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset) @@ -234,10 +234,10 @@ class TestKafkaSystemAdmin { // Add a second message to one of the same partition. producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get() - metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC)) + metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava) assertEquals(1, metadata.size) assertEquals(TOPIC, streamName) - sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata + sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata // key1 gets hash-mod'd to partition 48. assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset) assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset) @@ -245,7 +245,7 @@ class TestKafkaSystemAdmin { // Validate that a fetch will return the message. val connector = getConsumerConnector - var stream = connector.createMessageStreams(Map(TOPIC -> 1)).get(TOPIC).get.get(0).iterator + var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator var message = stream.next var text = new String(message.message, "UTF-8") connector.shutdown @@ -261,10 +261,10 @@ class TestKafkaSystemAdmin { @Test def testNonExistentTopic { - val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic")) - val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata")) + val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava) + val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata")) assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( - new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")))) + new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava)) } @Test @@ -273,9 +273,9 @@ class TestKafkaSystemAdmin { val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) val offsetsAfter = systemAdmin.getOffsetsAfter(Map( ssp1 -> "1", - ssp2 -> "2")) - assertEquals("2", offsetsAfter(ssp1)) - assertEquals("3", offsetsAfter(ssp2)) + ssp2 -> "2").asJava) + assertEquals("2", offsetsAfter.get(ssp1)) + assertEquals("3", offsetsAfter.get(ssp2)) } @Test @@ -310,7 +310,7 @@ class TestKafkaSystemAdmin { val systemAdmin = new KafkaSystemAdminWithTopicMetadataError val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3) try { - systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff) + systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff) fail("expected CallLimitReached to be thrown") } catch { case e: ExponentialSleepStrategy.CallLimitReached => () http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala index ce84b6d..c333935 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala @@ -23,11 +23,10 @@ import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig import org.apache.samza.config.StorageConfig import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.system.SystemStream import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ class TestKafkaSystemFactory { @Test @@ -36,7 +35,7 @@ class TestKafkaSystemFactory { try { producerFactory.getProducer( "test", - new MapConfig(Map[String, String]()), + new MapConfig(Map[String, String]().asJava), new MetricsRegistryMap) fail("Expected to get a Samza exception.") } catch { @@ -49,7 +48,7 @@ class TestKafkaSystemFactory { def testFailWhenSerdeIsInvalid { val producerFactory = new KafkaSystemFactory val config = new MapConfig(Map[String, String]( - "streams.test.serde" -> "failme")) + "streams.test.serde" -> "failme").asJava) try { producerFactory.getProducer( "test", @@ -70,7 +69,7 @@ class TestKafkaSystemFactory { "systems.test.producer.bootstrap.servers" -> "", "systems.test.samza.key.serde" -> "json", "systems.test.samza.msg.serde" -> "json", - "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory")) + "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory").asJava) var producer = producerFactory.getProducer( "test", config, @@ -91,7 +90,7 @@ class TestKafkaSystemFactory { StorageConfig.FACTORY.format("system1") -> "some.factory.Class", StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1", StorageConfig.FACTORY.format("system2") -> "some.factory.Class") - val config = new MapConfig(configMap) + val config = new MapConfig(configMap.asJava) assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config)) assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config)) assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config)) http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 5112ac6..c771788 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -205,14 +205,15 @@ class RocksDbKeyValueStore( class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] { private var open = true private var firstValueAccessed = false - def close() = { + + override def close() = { open = false iter.close() } - def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") + override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove") - def hasNext() = iter.isValid + override def hasNext() = iter.isValid // The iterator is already pointing to the next element protected def peekKey() = { @@ -231,7 +232,7 @@ class RocksDbKeyValueStore( // current element we are pointing to and advance the iterator to the next // location (The new location may or may not be valid - this will surface // when the next next() call is made, the isValid will fail) - def next() = { + override def next() = { if (!hasNext()) { throw new NoSuchElementException } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index 4141cbf..2aac6aa 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -24,7 +24,7 @@ import org.apache.samza.storage.{StoreProperties, StorageEngine} import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.util.TimerUtils -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * A key value store. @@ -101,7 +101,7 @@ class KeyValueStorageEngine[K, V]( def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) { val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize) - for (envelope <- envelopes) { + for (envelope <- envelopes.asScala) { val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]] val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]] http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala index 3de257c..9e67fc8 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala @@ -21,7 +21,7 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Util.notNull -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object NullSafeKeyValueStore { val NullKeyErrorMessage = "Null is not a valid key." @@ -39,7 +39,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt def getAll(keys: java.util.List[K]): java.util.Map[K, V] = { notNull(keys, NullKeysErrorMessage) - keys.foreach(key => notNull(key, NullKeyErrorMessage)) + keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage)) store.getAll(keys) } @@ -50,7 +50,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt } def putAll(entries: java.util.List[Entry[K, V]]) { - entries.foreach(entry => { + entries.asScala.foreach(entry => { notNull(entry.getKey, NullKeyErrorMessage) notNull(entry.getValue, NullValueErrorMessage) }) @@ -64,7 +64,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt def deleteAll(keys: java.util.List[K]) = { notNull(keys, NullKeysErrorMessage) - keys.foreach(key => notNull(key, NullKeyErrorMessage)) + keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage)) store.deleteAll(keys) } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala index d77d476..c8939b7 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala @@ -98,10 +98,10 @@ class SerializedKeyValueStore[K, V]( } private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] { - def hasNext() = iter.hasNext() - def remove() = iter.remove() - def close() = iter.close() - def next(): Entry[K, V] = { + override def hasNext() = iter.hasNext() + override def remove() = iter.remove() + override def close() = iter.close() + override def next(): Entry[K, V] = { val nxt = iter.next() val key = fromBytesOrNull(nxt.getKey, keySerde) val value = fromBytesOrNull(nxt.getValue, msgSerde) http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala index 595dd0d..f57b275 100644 --- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala +++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala @@ -19,7 +19,7 @@ package org.apache.samza.storage.kv -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import java.util /** @@ -36,7 +36,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] { } override def putAll(entries: java.util.List[Entry[String, String]]) { - for (entry <- entries) { + for (entry <- entries.asScala) { kvMap.put(entry.getKey, entry.getValue) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 1ce7d25..5d1b497 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -37,7 +37,7 @@ import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.{CommandLine, Logging, Util} import org.apache.samza.{Partition, SamzaException} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Random /** @@ -258,7 +258,7 @@ class TestKeyValuePerformance extends Logging { store.flush() timer.reset().start() - assert(store.getAll(shuffledKeys).size == shuffledKeys.size) + assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size) val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) // Restore cache, in case it's enabled, to a state similar to the one above when the getAll test started @@ -312,9 +312,9 @@ class TestKeyValuePerformance extends Logging { val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch) // We want to measure ::getAll when called many times, so populate the cache because first call is a cache-miss - val totalSize = store.getAll(shuffledKeys).values.map(_.length).sum + val totalSize = store.getAll(shuffledKeys.asJava).values.asScala.map(_.length).sum timer.reset().start() - assert(store.getAll(shuffledKeys).size == shuffledKeys.size) + assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size) val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS) // We want to measure ::get when called many times, so populate the cache because first call is a cache-miss http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index d7d23ec..babd15c 100644 --- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -36,7 +36,7 @@ import org.junit.Before import org.junit.Test import org.scalatest.Assertions.intercept -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer /** @@ -114,7 +114,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testGetAllWhenZeroMatch() { store.put(b("hello"), b("world")) val keys = List(b("foo"), b("bar")) - val actual = store.getAll(keys) + val actual = store.getAll(keys.asJava) keys.foreach(k => assertNull("Key: " + k, actual.get(k))) } @@ -122,18 +122,18 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testGetAllWhenFullMatch() { val expected = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) expected.foreach(e => store.put(e._1, e._2)) - val actual = store.getAll(expected.keys.toList) + val actual = store.getAll(expected.keys.toList.asJava) assertEquals("Size", expected.size, actual.size) expected.foreach(e => assertArrayEquals("Value at: " + s(e._1), e._2, actual.get(e._1))) } @Test def testGetAllWhenPartialMatch() { - val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2")) - val found = all.entrySet.head - val notFound = all.entrySet.last + val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2")).asJava + val found = all.entrySet.asScala.head + val notFound = all.entrySet.asScala.last store.put(found.getKey, found.getValue) - val actual = store.getAll(List(notFound.getKey, found.getKey)) + val actual = store.getAll(List(notFound.getKey, found.getKey).asJava) assertNull(actual.get(notFound.getKey)) assertArrayEquals(found.getValue, actual.get(found.getKey)) } @@ -160,14 +160,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { intercept[NullPointerException] { store.get(null) } intercept[NullPointerException] { store.getAll(null) } - intercept[NullPointerException] { store.getAll(List(a, null)) } + intercept[NullPointerException] { store.getAll(List(a, null).asJava) } intercept[NullPointerException] { store.delete(null) } intercept[NullPointerException] { store.deleteAll(null) } - intercept[NullPointerException] { store.deleteAll(List(a, null)) } + intercept[NullPointerException] { store.deleteAll(List(a, null).asJava) } intercept[NullPointerException] { store.put(null, a) } intercept[NullPointerException] { store.put(a, null) } - intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) } - intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) } + intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null)).asJava) } + intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a)).asJava) } intercept[NullPointerException] { store.range(a, null) } intercept[NullPointerException] { store.range(null, a) } } @@ -182,7 +182,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { // from the cache's underlying store (rocksdb), but that == would fail. val numEntries = CacheSize - 1 val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i))) - store.putAll(entries) + store.putAll(entries.asJava) if (cache) { assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue)) } else { @@ -225,7 +225,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testDeleteAllWhenZeroMatch() { val foo = b("foo") store.put(foo, foo) - store.deleteAll(List(b("bar"))) + store.deleteAll(List(b("bar")).asJava) assertArrayEquals(foo, store.get(foo)) } @@ -233,23 +233,23 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { def testDeleteAllWhenFullMatch() { val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) all.foreach(e => store.put(e._1, e._2)) - assertEquals(all.size, store.getAll(all.keys.toList).size) - store.deleteAll(all.keys.toList) + assertEquals(all.size, store.getAll(all.keys.toList.asJava).size) + store.deleteAll(all.keys.toList.asJava) all.keys.foreach(key => assertNull("Value at: " + s(key), store.get(key))) } @Test def testDeleteAllWhenPartialMatch() { - val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")) - val found = all.entrySet.head - val leftAlone = all.entrySet.last - all.foreach(e => store.put(e._1, e._2)) + val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")).asJava + val found = all.entrySet.asScala.head + val leftAlone = all.entrySet.asScala.last + all.asScala.foreach(e => store.put(e._1, e._2)) assertArrayEquals(found.getValue, store.get(found.getKey)) - store.deleteAll(List(b("not found"), found.getKey)) + store.deleteAll(List(b("not found"), found.getKey).asJava) store.flush() val allIterator = store.all try { - assertEquals(1, allIterator.size) + assertEquals(1, allIterator.asScala.size) assertArrayEquals(leftAlone.getValue, store.get(leftAlone.getKey)) } finally { allIterator.close() http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index b803dfe..7a107f6 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -45,7 +45,7 @@ import org.apache.samza.task._ import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore} import org.junit.Assert._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap} /* @@ -70,8 +70,8 @@ object StreamTaskTestUtil { def zkConnect: String = s"127.0.0.1:$zkPort" var producer: Producer[Array[Byte], Array[Byte]] = null - val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123")) - val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345")) + val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123").asJava) + val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345").asJava) var metadataStore: TopicMetadataStore = null @@ -203,7 +203,7 @@ class StreamTaskTestUtil { */ def startJob = { // Start task. - val job = new JobRunner(new MapConfig(jobConfig)).run() + val job = new JobRunner(new MapConfig(jobConfig.asJava)).run() assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000)) TestTask.awaitTaskRegistered val tasks = TestTask.tasks @@ -246,7 +246,7 @@ class StreamTaskTestUtil { val consumerConfig = new ConsumerConfig(props) val consumerConnector = Consumer.create(consumerConfig) - var stream = consumerConnector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0).iterator + val stream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head.iterator var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null var messages = ArrayBuffer[String]() http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala index 06a107b..c10e7fb 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala @@ -26,7 +26,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator} import org.junit.Assert._ import org.junit.{AfterClass, BeforeClass, Test} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object TestShutdownStatefulTask { val STORE_NAME = "loggedstore" @@ -119,7 +119,7 @@ class ShutdownStateStoreTask extends TestTask { .getStore(TestShutdownStatefulTask.STORE_NAME) .asInstanceOf[KeyValueStore[String, String]] val iter = store.all - iter.foreach( p => restored += (p.getKey -> p.getValue)) + iter.asScala.foreach( p => restored += (p.getKey -> p.getValue)) System.err.println("ShutdownStateStoreTask.init(): %s" format restored) iter.close } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 2240903..e5b6756 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -27,7 +27,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator} import org.junit.Assert._ import org.junit.{AfterClass, BeforeClass, Test} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ object TestStatefulTask { val STORE_NAME = "mystore" @@ -171,6 +171,7 @@ class StateStoreTestTask extends TestTask { store = context.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String, String]] val iter = store.all restored ++= iter + .asScala .map(_.getValue) .toSet System.err.println("StateStoreTestTask.init(): %s" format restored) http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index e5aafbb..c7b1b6d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -24,7 +24,7 @@ import org.apache.samza.config.{Config, JobConfig, YarnConfig} import org.apache.samza.coordinator.stream.CoordinatorStreamWriter import org.apache.samza.coordinator.stream.messages.SetConfig -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration @@ -160,7 +160,7 @@ class ClientHelper(conf: Configuration) extends Logging { resource.setVirtualCores(cpu) info("set cpu core request to %s for %s" format (cpu, appId.get)) appCtx.setResource(resource) - containerCtx.setCommands(cmds.toList) + containerCtx.setCommands(cmds.asJava) info("set command to %s for %s" format (cmds, appId.get)) appCtx.setApplicationId(appId.get) @@ -173,7 +173,7 @@ class ClientHelper(conf: Configuration) extends Logging { // include the resources from the universal resource configurations try { val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf)) - localResources ++= resourceMapper.getResourceMap + localResources ++= resourceMapper.getResourceMap.asScala } catch { case e: LocalizerResourceException => { throw new SamzaException("Exception during resource mapping from config. ", e) @@ -202,12 +202,12 @@ class ClientHelper(conf: Configuration) extends Logging { // prepare all local resources for localizer info("localResources is: %s" format localResources) - containerCtx.setLocalResources(localResources) + containerCtx.setLocalResources(localResources.asJava) info("set local resources on application master for %s" format appId.get) env match { case Some(env) => { - containerCtx.setEnvironment(env) + containerCtx.setEnvironment(env.asJava) info("set environment variables to %s for %s" format (env, appId.get)) } case None => @@ -232,8 +232,8 @@ class ClientHelper(conf: Configuration) extends Logging { def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = { yarnClient .getApplications - .filter(appRep => appId.equals(appRep.getApplicationId())) - .headOption + .asScala + .find(appRep => appId.equals(appRep.getApplicationId)) } def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = { @@ -241,9 +241,10 @@ class ClientHelper(conf: Configuration) extends Logging { status match { case Some(status) => getAppsRsp + .asScala .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)) .toList - case None => getAppsRsp.toList + case None => getAppsRsp.asScala.toList } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala index f057594..2d8a3f1 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.samza.config.Config import org.apache.samza.util.hadoop.HttpFileSystem import org.apache.samza.util.Logging -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ class YarnJobFactory extends StreamJobFactory with Logging { def getJob(config: Config) = { @@ -42,7 +42,7 @@ class YarnJobFactory extends StreamJobFactory with Logging { // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration val fsImplConfig = new FileSystemImplConfig(config) - fsImplConfig.getSchemes.foreach( + fsImplConfig.getSchemes.asScala.foreach( (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme)) ) http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index cdd389c..122a1df 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -25,12 +25,12 @@ import scalate.ScalateSupport import org.apache.samza.config.Config import org.apache.samza.job.yarn.{YarnAppState, ClientHelper} import org.apache.samza.metrics._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.yarn.conf.YarnConfiguration import java.util.HashMap import org.apache.samza.serializers.model.SamzaObjectMapper -class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { +class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration val client = new ClientHelper(yarnConfig) val jsonMapper = SamzaObjectMapper.getObjectMapper @@ -43,10 +43,10 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati val metricMap = new HashMap[String, java.util.Map[String, Object]] // build metric map - registry.getGroups.foreach(group => { + registry.getGroups.asScala.foreach(group => { val groupMap = new HashMap[String, Object] - registry.getGroup(group).foreach { + registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor() { def counter(counter: Counter) = @@ -79,7 +79,7 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati get("/am") { val containers = new HashMap[String, HashMap[String, Object]] - state.runningYarnContainers.foreach { + state.runningYarnContainers.asScala.foreach { case (containerId, container) => val yarnContainerId = container.id.toString val containerMap = new HashMap[String, Object] @@ -98,10 +98,10 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati "containers" -> containers, "host" -> "%s:%s".format(state.nodeHost, state.rpcUrl.getPort)) - jsonMapper.writeValueAsString(new HashMap[String, Object](status)) + jsonMapper.writeValueAsString(new HashMap[String, Object](status.asJava)) } get("/config") { - jsonMapper.writeValueAsString(new HashMap[String, Object](config.sanitize.toMap)) + jsonMapper.writeValueAsString(new HashMap[String, Object](samzaConfig.sanitize)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala index a32cd65..d787f9e 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala @@ -24,12 +24,12 @@ import org.scalatra._ import scalate.ScalateSupport import org.apache.samza.job.yarn.YarnAppState import org.apache.samza.config.Config -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.immutable.TreeMap import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.webapp.util.WebAppUtils -class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport { +class ApplicationMasterWebServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration before() { @@ -38,7 +38,7 @@ class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicatio get("/") { layoutTemplate("/WEB-INF/views/index.scaml", - "config" -> TreeMap(config.sanitize.toMap.toArray: _*), + "config" -> TreeMap(samzaConfig.sanitize.asScala.toMap.toArray: _*), "state" -> state, "samzaAppState" -> samzaAppState, "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig)) http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala index d3d34f2..c320a97 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala @@ -21,8 +21,8 @@ package org.apache.samza.job.yarn import org.apache.samza.Partition import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory} -import scala.collection.JavaConversions._ +import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin} +import scala.collection.JavaConverters._ /** * A mock implementation class that returns metadata for each stream that contains numTasks partitions in it. @@ -30,12 +30,12 @@ import scala.collection.JavaConversions._ class MockSystemAdmin(numTasks: Int) extends SystemAdmin { def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null def getSystemStreamMetadata(streamNames: java.util.Set[String]) = { - streamNames.map(streamName => { - var partitionMetadata = (0 until numTasks).map(partitionId => { + streamNames.asScala.map(streamName => { + val partitionMetadata = (0 until numTasks).map(partitionId => { new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null) }).toMap - streamName -> new SystemStreamMetadata(streamName, partitionMetadata) - }).toMap[String, SystemStreamMetadata] + streamName -> new SystemStreamMetadata(streamName, partitionMetadata.asJava) + }).toMap.asJava } override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala index 5c15385..ad8337b 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala @@ -24,11 +24,11 @@ import org.apache.hadoop.fs.{FileStatus, Path, FileSystem} import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.samza.SamzaException -import org.apache.samza.config.{MapConfig, JobConfig, Config, YarnConfig} +import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig} import org.mockito.Mockito._ import org.mockito.Matchers.any import org.scalatest.FunSuite -import org.scalatest.mock.MockitoSugar +import org.scalatest.mockito.MockitoSugar class TestClientHelper extends FunSuite { http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala index 1dd0c18..65c03d1 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala @@ -23,17 +23,12 @@ import java.io.BufferedReader import java.net.URL import java.io.InputStreamReader import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.samza.Partition import org.apache.samza.clustermanager.SamzaApplicationState import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory} import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.container.TaskName import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory @@ -113,7 +108,7 @@ class TestSamzaYarnAppMasterService { "yarn.container.retry.count" -> "1", "yarn.container.retry.window.ms" -> "1999999999", "job.coordinator.system" -> "coordinator", - "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName)) + "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName).asJava) }