Repository: incubator-samza Updated Branches: refs/heads/master d497a4079 -> 3161c640a
SAMZA-163: Remove mutable.ConcurrentMap when we stop supporting 2.9 Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3161c640 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3161c640 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3161c640 Branch: refs/heads/master Commit: 3161c640a4cfada839668054d9e8fd8b9b6cd046 Parents: d497a40 Author: Jakob Homan <[email protected]> Authored: Mon Dec 15 10:35:15 2014 -0800 Committer: Jakob Homan <[email protected]> Committed: Mon Dec 15 10:35:15 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/samza/metrics/Snapshot.java | 1 + .../apache/samza/system/kafka/BrokerProxy.scala | 17 +++++++++-------- .../samza/util/ClientUtilTopicMetadataStore.scala | 3 --- .../samza/test/integration/SimpleStatefulTask.java | 3 ++- .../samza/test/integration/StatePerfTestTask.java | 3 ++- .../samza/test/integration/join/Checker.java | 1 + 6 files changed, 15 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java index 7666909..4c7b525 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/Snapshot.java @@ -90,6 +90,7 @@ public class Snapshot { * * @return the list of values */ + @SuppressWarnings("unchecked") public ArrayList<Long> getValues() { return (ArrayList<Long>) values.clone(); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index ba1aa9b..9daf824 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -21,18 +21,19 @@ package org.apache.samza.system.kafka +import java.nio.channels.ClosedByInterruptException +import java.util.Map.Entry +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} import kafka.api._ import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition} -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} -import scala.collection.JavaConversions._ +import kafka.consumer.ConsumerConfig import kafka.message.MessageSet +import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging -import java.nio.channels.ClosedByInterruptException -import java.util.Map.Entry -import scala.collection.mutable -import kafka.consumer.ConsumerConfig import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX -import org.apache.samza.util.ExponentialSleepStrategy +import scala.collection.JavaConversions._ +import scala.collection.concurrent +import scala.collection.mutable /** * Companion object for class JvmMetrics encapsulating various constants @@ -65,7 +66,7 @@ class BrokerProxy( val sleepMSWhileNoTopicPartitions = 100 /** What's the next offset for a particular partition? **/ - val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() + val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() /** Block on the first call to get message if the fetcher has not yet returned its initial results **/ // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala index 1f415d2..0f91622 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/ClientUtilTopicMetadataStore.scala @@ -22,9 +22,6 @@ package org.apache.samza.util import kafka.api.{ TopicMetadataResponse, TopicMetadata } import org.apache.samza.SamzaException import kafka.client.ClientUtils -import org.apache.samza.util.Logging -import kafka.common.ErrorMapping -import kafka.cluster.Broker import java.util.concurrent.atomic.AtomicInteger trait TopicMetadataStore extends Logging { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java index 52a8059..4dbcb75 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java @@ -34,7 +34,8 @@ import org.apache.samza.task.TaskCoordinator.RequestScope; public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; - + + @SuppressWarnings("unchecked") public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, String>) context.getStore("mystore"); System.out.println("Contents of store: "); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java index 25417a6..77f770e 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java @@ -35,7 +35,8 @@ public class StatePerfTestTask implements StreamTask, InitableTask { private int count = 0; private int LOG_INTERVAL = 100000; private long start = System.currentTimeMillis(); - + + @SuppressWarnings("unchecked") public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, String>) context.getStore("mystore"); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3161c640/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java index 3012b19..2a2177a 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java @@ -41,6 +41,7 @@ public class Checker implements StreamTask, WindowableTask, InitableTask { private int numPartitions; @Override + @SuppressWarnings("unchecked") public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, String>) context.getStore("checker-state"); this.expectedKeys = config.getInt("expected.keys");
