Replaced KafkaSystemConsumer, based on SimpleConsumer, with NewKafkaSystemConsumer, based on high level Kafka consumer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/332a0481 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/332a0481 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/332a0481 Branch: refs/heads/NewKafkaSystemConsumer Commit: 332a04815bbc5d526b736d82e5f05262b0922d57 Parents: bab5bdd Author: Boris S <[email protected]> Authored: Wed Sep 5 11:51:58 2018 -0700 Committer: Boris S <[email protected]> Committed: Wed Sep 5 11:51:58 2018 -0700 ---------------------------------------------------------------------- .../samza/system/IncomingMessageEnvelope.java | 3 +- .../ClusterBasedJobCoordinator.java | 2 +- .../stream/CoordinatorStreamSystemConsumer.java | 4 +- .../apache/samza/storage/StorageRecovery.java | 2 +- .../samza/checkpoint/CheckpointTool.scala | 2 +- .../apache/samza/checkpoint/OffsetManager.scala | 4 +- .../samza/coordinator/JobModelManager.scala | 5 +- .../samza/job/local/ProcessJobFactory.scala | 3 +- .../samza/job/local/ThreadJobFactory.scala | 14 +- .../samza/coordinator/TestJobCoordinator.scala | 4 +- .../clients/consumer/KafkaConsumerConfig.java | 81 ++-- .../samza/system/kafka/KafkaConsumerProxy.java | 32 +- .../kafka/KafkaSystemConsumerMetrics.scala | 69 ++- .../samza/system/kafka/KafkaSystemFactory.scala | 47 +- .../system/kafka/NewKafkaSystemConsumer.java | 93 ++-- .../samza/system/kafka/TestBrokerProxy.scala | 437 ------------------- .../test/integration/StreamTaskTestUtil.scala | 8 +- 17 files changed, 170 insertions(+), 640 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 4d0ce2f..c5aed31 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -59,7 +59,8 @@ public class IncomingMessageEnvelope { * @param message A deserialized message received from the partition offset. * @param size size of the message and key in bytes. */ - public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) { + public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, + Object key, Object message, int size) { this.systemStreamPartition = systemStreamPartition; this.offset = offset; this.key = key; http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 016d171..12e26f7 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -174,7 +174,7 @@ public class ClusterBasedJobCoordinator { // build a JobModelManager and ChangelogStreamManager and perform partition assignments. changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - jobModelManager = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()); + jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); config = jobModelManager.jobModel().getConfig(); hasDurableStores = new StorageConfig(config).hasDurableStores(); http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 0bdb874..38255a2 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -176,7 +176,7 @@ public class CoordinatorStreamSystemConsumer { valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); } CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); - log.info("Received coordinator stream message: {}", coordinatorStreamMessage); + log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); // Remove any existing entry. Set.add() does not add if the element already exists. if (bootstrappedMessages.remove(coordinatorStreamMessage)) { log.debug("Removed duplicate message: {}", coordinatorStreamMessage); @@ -194,7 +194,7 @@ public class CoordinatorStreamSystemConsumer { } bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages); - log.info("Bootstrapped configuration: {}", configMap); + log.debug("Bootstrapped configuration: {}", configMap); isBootstrapped = true; } catch (Exception e) { throw new SamzaException(e); http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index f9c6c0c..c6dd9a7 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -131,7 +131,7 @@ public class StorageRecovery extends CommandLine { coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModel jobModel = JobModelManager.apply(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()).jobModel(); + JobModel jobModel = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()).jobModel(); containers = jobModel.getContainers(); coordinatorStreamManager.stop(); } http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 0ca8a3d..65fb419 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -170,7 +170,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogManager = new ChangelogStreamManager(coordinatorStreamManager) - val jobModelManager = JobModelManager(coordinatorStreamManager, changelogManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogManager.readPartitionMapping()) val taskNames = jobModelManager .jobModel .getContainers http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index d2b6667..53d5e98 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -304,7 +304,7 @@ class OffsetManager( */ private def loadOffsetsFromCheckpointManager { if (checkpointManager != null) { - info("Loading offsets from checkpoint manager.") + debug("Loading offsets from checkpoint manager.") checkpointManager.start val result = systemStreamPartitions @@ -332,7 +332,7 @@ class OffsetManager( * Loads last processed offsets for a single taskName. */ private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = { - info("Loading checkpoints for taskName: %s." format taskName) + debug("Loading checkpoints for taskName: %s." format taskName) val checkpoint = checkpointManager.readLastCheckpoint(taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index f939736..f7ffd4e 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -64,12 +64,11 @@ object JobModelManager extends Logging { * a) Reads the jobModel from coordinator stream using the job's configuration. * b) Recomputes changelog partition mapping based on jobModel and job's configuration. * c) Builds JobModelManager using the jobModel read from coordinator stream. - * @param coordinatorStreamManager Coordinator stream manager. + * @param config Coordinator stream manager config * @param changelogPartitionMapping The changelog partition-to-task mapping. * @return JobModelManager */ - def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = { - val config = coordinatorStreamManager.getConfig + def apply(config: Config, changelogPartitionMapping: util.Map[TaskName, Integer]) = { val localityManager = new LocalityManager(config, new MetricsRegistryMap()) // Map the name of each system to the corresponding SystemAdmin http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index 642a484..64f516b 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -50,7 +50,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging { coordinatorStreamManager.bootstrap val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager) - val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()) + val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping()) val jobModel = coordinator.jobModel val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer] @@ -61,6 +61,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging { } changelogStreamManager.writePartitionMapping(taskPartitionMappings) + coordinatorStreamManager.stop() //create necessary checkpoint and changelog streams val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry) http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 34cc2a0..15aa5a6 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -19,11 +19,9 @@ package org.apache.samza.job.local -import java.util.concurrent.{CountDownLatch, TimeUnit} - -import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ +import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName} import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.CoordinatorStreamManager @@ -38,8 +36,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable /** - * Creates a new Thread job with the given config - */ + * Creates a new Thread job with the given config + */ class ThreadJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { info("Creating a ThreadJob, which is only meant for debugging.") @@ -51,7 +49,8 @@ class ThreadJobFactory extends StreamJobFactory with Logging { coordinatorStreamManager.bootstrap val changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager) - val coordinator = JobModelManager(coordinatorStreamManager, changelogStreamManager.readPartitionMapping()) + val coordinator = JobModelManager(coordinatorStreamManager.getConfig, changelogStreamManager.readPartitionMapping()) + coordinatorStreamManager.stop() val jobModel = coordinator.jobModel val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]() @@ -85,7 +84,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { - case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) + case Some(taskOpts) => warn("%s was specified in config, but is not being used because job is being executed with ThreadJob. You probably want to run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName)) case _ => None } @@ -117,7 +116,6 @@ class ThreadJobFactory extends StreamJobFactory with Logging { threadJob } finally { coordinator.stop - coordinatorStreamManager.stop jmxServer.stop } } http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 42610ae..b85b4a4 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -275,7 +275,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { coordinatorStreamManager.start coordinatorStreamManager.bootstrap val changelogPartitionManager = new ChangelogStreamManager(coordinatorStreamManager) - JobModelManager(coordinatorStreamManager, changelogPartitionManager.readPartitionMapping()) + val jobModelManager = JobModelManager(coordinatorStreamManager.getConfig, changelogPartitionManager.readPartitionMapping()) + coordinatorStreamManager.stop() + jobModelManager } @Before http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index 88437ee..843e03d 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -43,11 +43,13 @@ public class KafkaConsumerConfig extends ConsumerConfig { private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; + private static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin"; private static final String SAMZA_OFFSET_LARGEST = "largest"; private static final String SAMZA_OFFSET_SMALLEST = "smallest"; private static final String KAFKA_OFFSET_LATEST = "latest"; private static final String KAFKA_OFFSET_EARLIEST = "earliest"; private static final String KAFKA_OFFSET_NONE = "none"; + /* * By default, KafkaConsumer will fetch ALL available messages for all the partitions. * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). @@ -59,8 +61,8 @@ public class KafkaConsumerConfig extends ConsumerConfig { super(props); } - public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, - String systemName, String clientId, Map<String, String> injectProps) { + public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId, + Map<String, String> injectProps) { Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); @@ -72,17 +74,20 @@ public class KafkaConsumerConfig extends ConsumerConfig { consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - //Open-source Kafka Consumer configuration - consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable consumer auto-commit + //Kafka client configuration + + // Disable consumer auto-commit because Samza controls commits + consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.setProperty( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - getAutoOffsetResetValue(consumerProps)); // Translate samza config value to kafka config value + // Translate samza config value to kafka config value + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + getAutoOffsetResetValue(consumerProps)); // make sure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? - if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { // get it from the producer config - String bootstrapServer = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + String bootstrapServer = + config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); if (StringUtils.isEmpty(bootstrapServer)) { throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); } @@ -90,25 +95,22 @@ public class KafkaConsumerConfig extends ConsumerConfig { } // Always use default partition assignment strategy. Do not allow override. - consumerProps.setProperty( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, - RangeAssignor.class.getName()); - + consumerProps.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should // default to byte[] - if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); } - if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("setting default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); } - // NOT SURE THIS IS NEEDED TODO - String maxPollRecords = subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);; + String maxPollRecords = + subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT); consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // put overrides @@ -122,38 +124,37 @@ public class KafkaConsumerConfig extends ConsumerConfig { JobConfig jobConfig = new JobConfig(config); Option<String> jobIdOption = jobConfig.getJobId(); Option<String> jobNameOption = jobConfig.getName(); - return (jobNameOption.isDefined()? jobNameOption.get() : "undefined_job_name") + "-" - + (jobIdOption.isDefined()? jobIdOption.get() : "undefined_job_id"); + return (jobNameOption.isDefined() ? jobNameOption.get() : "undefined_job_name") + "-" + (jobIdOption.isDefined() + ? jobIdOption.get() : "undefined_job_id"); } + // client id should be unique per job - public static String getClientId(String id, Config config) { + public static String getClientId(Config config) { + return getClientId(CONSUMER_CLIENT_ID_PREFIX, config); + } + public static String getProducerClientId(Config config) { + return getClientId(PRODUCER_CLIENT_ID_PREFIX, config); + } + public static String getAdminClientId(Config config) { + return getClientId(ADMIN_CLIENT_ID_PREFIX, config); + } + + private static String getClientId(String id, Config config) { if (config.get(JobConfig.JOB_NAME()) == null) { throw new ConfigException("Missing job name"); } String jobName = config.get(JobConfig.JOB_NAME()); - String jobId = "1"; - if (config.get(JobConfig.JOB_ID()) != null) { - jobId = config.get(JobConfig.JOB_ID()); - } - return getClientId(id, jobName, jobId); - } + String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1"; - private static String getClientId(String id, String jobName, String jobId) { - return String.format( - "%s-%s-%s", - id.replaceAll("[^A-Za-z0-9]", "_"), - jobName.replaceAll("[^A-Za-z0-9]", "_"), + return String.format("%s-%s-%s", id.replaceAll("[^A-Za-z0-9]", "_"), jobName.replaceAll("[^A-Za-z0-9]", "_"), jobId.replaceAll("[^A-Za-z0-9]", "_")); } - public static String getProducerClientId(Config config) { - return getClientId(PRODUCER_CLIENT_ID_PREFIX, config); - } - /** * Settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset) - need to convert * "largest" -> "latest" * "smallest" -> "earliest" + * "none" -> "none" * "none" - will fail the kafka consumer, if offset is out of range * @param properties All consumer related {@link Properties} parsed from samza config * @return String representing the config value for "auto.offset.reset" property @@ -162,9 +163,8 @@ public class KafkaConsumerConfig extends ConsumerConfig { String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST); // accept kafka values directly - if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || - autoOffsetReset.equals(KAFKA_OFFSET_LATEST) || - autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { + if (autoOffsetReset.equals(KAFKA_OFFSET_EARLIEST) || autoOffsetReset.equals(KAFKA_OFFSET_LATEST) + || autoOffsetReset.equals(KAFKA_OFFSET_NONE)) { return autoOffsetReset; } @@ -177,5 +177,4 @@ public class KafkaConsumerConfig extends ConsumerConfig { return KAFKA_OFFSET_LATEST; } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index cddfdfd..a6272cd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -86,7 +86,7 @@ public class KafkaConsumerProxy<K, V> { this.clientId = clientId; // TODO - see if we need new metrics (not host:port based) - this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0); + this.kafkaConsumerMetrics.registerClientProxy(metricName); consumerPollThread = new Thread(createProxyThreadRunnable()); } @@ -132,7 +132,7 @@ public class KafkaConsumerProxy<K, V> { // we reuse existing metrics. They assume host and port for the broker // for now fake the port with the consumer name - kafkaConsumerMetrics.setTopicPartitionValue(metricName, 0, nextOffsets.size()); + kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size()); } /** @@ -258,16 +258,10 @@ public class KafkaConsumerProxy<K, V> { results.put(ssp, listMsgs); } - // TODO - add calculation of the size of the message, when available from Kafka - int msgSize = 0; - // if (fetchLimitByBytesEnabled) { - msgSize = getRecordSize(r); - //} - final K key = r.key(); final Object value = r.value(); IncomingMessageEnvelope imEnvelope = - new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, value, msgSize); + new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, value, getRecordSize(r)); listMsgs.add(imEnvelope); } if (LOG.isDebugEnabled()) { @@ -282,18 +276,8 @@ public class KafkaConsumerProxy<K, V> { } private int getRecordSize(ConsumerRecord<K, V> r) { - int keySize = 0; //(r.key() == null) ? 0 : r.key().getSerializedKeySize(); - return keySize; // + r.getSerializedMsgSize(); // TODO -enable when functionality available from Kafka - - //int getMessageSize (Message message) { - // Approximate additional shallow heap overhead per message in addition to the raw bytes - // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead. - // As this overhead is a moving target, and not very large - // compared to the message size its being ignore in the computation for now. - // int MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4; - - // return message.size() + MESSAGE_SIZE_OVERHEAD; - // } + int keySize = (r.key() == null) ? 0 : r.serializedKeySize(); + return keySize + r.serializedValueSize(); } private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) { @@ -310,7 +294,7 @@ public class KafkaConsumerProxy<K, V> { kafkaConsumerMetrics.incReads(tap); kafkaConsumerMetrics.incBytesReads(tap, size); kafkaConsumerMetrics.setOffsets(tap, recordOffset); - kafkaConsumerMetrics.incBrokerBytesReads(metricName, 0, size); + kafkaConsumerMetrics.incClientBytesReads(metricName, size); kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); } @@ -398,7 +382,7 @@ public class KafkaConsumerProxy<K, V> { } LOG.debug("pollConsumer {}", SSPsToFetch.size()); if (!SSPsToFetch.isEmpty()) { - kafkaConsumerMetrics.incBrokerReads(metricName, 0); + kafkaConsumerMetrics.incClientReads(metricName); Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; if (LOG.isDebugEnabled()) { @@ -420,7 +404,7 @@ public class KafkaConsumerProxy<K, V> { LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer, SLEEP_MS_WHILE_NO_TOPIC_PARTITION); - kafkaConsumerMetrics.incBrokerSkippedFetchRequests(metricName, 0); + kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName); try { Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION); http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 1aa66dc..415bd38 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -19,13 +19,10 @@ package org.apache.samza.system.kafka -import org.apache.samza.metrics.MetricsHelper -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsRegistry import java.util.concurrent.ConcurrentHashMap + import kafka.common.TopicAndPartition -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge +import org.apache.samza.metrics._ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { val offsets = new ConcurrentHashMap[TopicAndPartition, Counter] @@ -34,68 +31,66 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val lag = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] - /* - TODO Fix - * (String, Int) = (host, port) of BrokerProxy. - */ - - val reconnects = new ConcurrentHashMap[(String, Int), Counter] - val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter] - val brokerReads = new ConcurrentHashMap[(String, Int), Counter] - val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), Counter] - val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]] + val clientBytesRead = new ConcurrentHashMap[String, Counter] + val clientReads = new ConcurrentHashMap[String, Counter] + val clientSkippedFetchRequests = new ConcurrentHashMap[String, Counter] + val topicPartitions = new ConcurrentHashMap[String, Gauge[Int]] def registerTopicAndPartition(tp: TopicAndPartition) = { if (!offsets.contains(tp)) { - offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition))) - bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition))) - reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition))) - highWatermark.put(tp, newGauge("%s-%s-high-watermark" format (tp.topic, tp.partition), -1L)) - lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L)) + offsets.put(tp, newCounter("%s-%s-offset-change" format(tp.topic, tp.partition))) + bytesRead.put(tp, newCounter("%s-%s-bytes-read" format(tp.topic, tp.partition))) + reads.put(tp, newCounter("%s-%s-messages-read" format(tp.topic, tp.partition))) + highWatermark.put(tp, newGauge("%s-%s-high-watermark" format(tp.topic, tp.partition), -1L)) + lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format(tp.topic, tp.partition), 0L)) } } - def registerBrokerProxy(host: String, port: Int) { - reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port))) - brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port))) - brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port))) - brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port))) - topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0)) + def registerClientProxy(clientName: String) { + clientBytesRead.put(clientName, newCounter("%s-%s-bytes-read" format clientName)) + clientReads.put((clientName), newCounter("%s-%s-messages-read" format clientName)) + clientSkippedFetchRequests.put((clientName), newCounter("%s-%s-skipped-fetch-requests" format clientName)) + topicPartitions.put(clientName, newGauge("%s-%s-topic-partitions" format clientName, 0)) } // java friendlier interfaces // Gauges - def setTopicPartitionValue(host: String, port: Int, value: Int) { - topicPartitions.get((host,port)).set(value) + def setTopicPartitionValue(clientName: String, value: Int) { + topicPartitions.get(clientName).set(value) } + def setLagValue(topicAndPartition: TopicAndPartition, value: Long) { lag.get((topicAndPartition)).set(value); } + def setHighWatermarkValue(topicAndPartition: TopicAndPartition, value: Long) { highWatermark.get((topicAndPartition)).set(value); } // Counters - def incBrokerReads(host: String, port: Int) { - brokerReads.get((host,port)).inc + def incClientReads(clientName: String) { + clientReads.get(clientName).inc } + def incReads(topicAndPartition: TopicAndPartition) { reads.get(topicAndPartition).inc; } + def incBytesReads(topicAndPartition: TopicAndPartition, inc: Long) { bytesRead.get(topicAndPartition).inc(inc); } - def incBrokerBytesReads(host: String, port: Int, incBytes: Long) { - brokerBytesRead.get((host,port)).inc(incBytes) + + def incClientBytesReads(clientName: String, incBytes: Long) { + clientBytesRead.get(clientName).inc(incBytes) } - def incBrokerSkippedFetchRequests(host: String, port: Int) { - brokerSkippedFetchRequests.get((host,port)).inc() + + def incClientSkippedFetchRequests(clientName: String) { + clientSkippedFetchRequests.get(clientName).inc() } + def setOffsets(topicAndPartition: TopicAndPartition, offset: Long) { offsets.get(topicAndPartition).set(offset) } - def incReconnects(host: String, port: Int) { - reconnects.get((host,port)).inc() - } + override def getPrefix = systemName + "-" } http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 6a5eda9..892d400 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -19,27 +19,21 @@ package org.apache.samza.system.kafka -import java.util import java.util.Properties -import kafka.consumer.ConsumerConfig import kafka.utils.ZkUtils -import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.KafkaConsumerConfig +import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode -import org.apache.samza.util._ -import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig} -import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ -import org.apache.samza.system.SystemProducer -import org.apache.samza.system.SystemAdmin import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.system.SystemConsumer +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig} +import org.apache.samza.metrics.MetricsRegistry +import org.apache.samza.system.{SystemAdmin, SystemConsumer, SystemFactory, SystemProducer} +import org.apache.samza.util._ object KafkaSystemFactory extends Logging { def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { @@ -51,8 +45,9 @@ object KafkaSystemFactory extends Logging { } class KafkaSystemFactory extends SystemFactory with Logging { + def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = { - val clientId = KafkaUtil.getClientId("samza-consumer", config) + val clientId = KafkaConsumerConfig.getClientId( config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) NewKafkaSystemConsumer.getNewKafkaSystemConsumer( @@ -60,10 +55,12 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { - val clientId = KafkaUtil.getClientId("samza-producer", config) + val clientId = KafkaConsumerConfig.getProducerClientId(config) val injectedProps = KafkaSystemFactory.getInjectedProducerProperties(systemName, config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId, injectedProps) - val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) } + val getProducer = () => { + new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) + } val metrics = new KafkaSystemProducerMetrics(systemName, registry) // Unlike consumer, no need to use encoders here, since they come for free @@ -79,7 +76,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { } def getAdmin(systemName: String, config: Config): SystemAdmin = { - val clientId = KafkaUtil.getClientId("samza-admin", config) + val clientId = KafkaConsumerConfig.getClientId(config) val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) @@ -94,13 +91,13 @@ class KafkaSystemFactory extends SystemFactory with Logging { val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt val storeToChangelog = config.getKafkaChangelogEnabledStores() // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. - val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) => - { - val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt - val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) - info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor)) - (topicName, changelogInfo) - }} + val topicMetaInformation = storeToChangelog.map { case (storeName, topicName) => { + val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt + val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) + info("Creating topic meta information for topic: %s with replication factor: %s" format(topicName, replicationFactor)) + (topicName, changelogInfo) + } + } val deleteCommittedMessages = config.deleteCommittedMessages(systemName).exists(isEnabled => isEnabled.toBoolean) val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config) @@ -125,7 +122,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props } } - def getIntermediateStreamProperties(config : Config): Map[String, Properties] = { + def getIntermediateStreamProperties(config: Config): Map[String, Properties] = { val appConfig = new ApplicationConfig(config) if (appConfig.getAppMode == ApplicationMode.BATCH) { val streamConfig = new StreamConfig(config) http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index b33db42..717b45d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -53,12 +53,12 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements private static final long FETCH_THRESHOLD = 50000; private static final long FETCH_THRESHOLD_BYTES = -1L; + private final Consumer<K, V> kafkaConsumer; private final String systemName; private final KafkaSystemConsumerMetrics samzaConsumerMetrics; private final String clientId; private final String metricName; - /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false); private final Config config; @@ -66,15 +66,16 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. /* package private */ KafkaConsumerMessageSink messageSink; + // proxy is doing the actual reading private KafkaConsumerProxy proxy; /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); + /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + /* package private */ long perPartitionFetchThreshold; /* package private */ long perPartitionFetchThresholdBytes; - // TODO - consider new class for KafkaSystemConsumerMetrics - /** * @param systemName * @param config @@ -85,32 +86,28 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements super(metrics.registry(), clock, metrics.getClass().getName()); + this.kafkaConsumer = kafkaConsumer; this.samzaConsumerMetrics = metrics; this.clientId = clientId; this.systemName = systemName; this.config = config; this.metricName = systemName + " " + clientId; - this.kafkaConsumer = kafkaConsumer; - this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); - LOG.info(String.format( - "Created SamzaKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with KafkaConsumer=%s", systemName, - clientId, metricName, this.kafkaConsumer.toString())); + LOG.info("Created SamzaKafkaSystemConsumer for system={}, clientId={}, metricName={}, KafkaConsumer={}", systemName, + clientId, metricName, this.kafkaConsumer.toString()); } public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { - - // extract consumer configs and create kafka consumer KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); - + LOG.info("Created kafka consumer for system {}, clientId {}: {}", systemName, clientId, kafkaConsumer); NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); - System.out.println("kc=" + kc + "!!!!!!!!!!!!!!!!!GETTING FOR NKC for " + systemName); + LOG.info("Created samza system consumer {}", kc.toString()); return kc; } @@ -126,12 +123,11 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Map<String, String> injectProps = new HashMap<>(); - // extract kafka consumer configs + // extract kafka client configs KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); - LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", - systemName, consumerConfig.originals()); + LOG.info("KafkaClient properties for systemName {}: {}", systemName, consumerConfig.originals()); return new KafkaConsumer<>(consumerConfig.originals()); } @@ -146,29 +142,23 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements LOG.warn("attempting to start a stopped consumer"); return; } - LOG.info("==============>About to start consumer"); // initialize the subscriptions for all the registered TopicPartitions startSubscription(); - LOG.info("==============>subscription started"); // needs to be called after all the registrations are completed setFetchThresholds(); - LOG.info("==============>thresholds ste"); // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream // and puts them into the sink. createConsumerProxy(); - LOG.info("==============>proxy started"); startConsumer(); - LOG.info("==============>consumer started"); + LOG.info("consumer {} started", this); } private void startSubscription() { - //subscribe to all the TopicPartitions - LOG.info("==============>startSubscription for TP: " + topicPartitions2SSP.keySet()); + //subscribe to all the registered TopicPartitions + LOG.info("consumer {}, subscribes to {} ", this, topicPartitions2SSP.keySet()); try { synchronized (kafkaConsumer) { // we are using assign (and not subscribe), so we need to specify both topic and partition - //topicPartitions2SSP.put(new TopicPartition("FAKE PARTITION", 0), new SystemStreamPartition("Some","Another", new Partition(0))); - //topicPartitions2Offset.put(new TopicPartition("FAKE PARTITION", 0), "1234"); kafkaConsumer.assign(topicPartitions2SSP.keySet()); } } catch (Exception e) { @@ -184,7 +174,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements // create the thread with the consumer proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName); - LOG.info("==============>Created consumer proxy: " + proxy); + LOG.info("Created consumer proxy: " + proxy); } /* @@ -194,6 +184,10 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements */ void startConsumer() { //set the offset for each TopicPartition + if (topicPartitions2Offset.size() <= 0) { + LOG.warn("Consumer {} is not subscribed to any SSPs", this); + } + topicPartitions2Offset.forEach((tp, startingOffsetString) -> { long startingOffset = Long.valueOf(startingOffsetString); @@ -209,16 +203,15 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements throw new SamzaException(e); } - LOG.info("==============>Changing Consumer's position for tp = " + tp + " to " + startingOffsetString); + LOG.info("Changing consumer's starting offset for tp = " + tp + " to " + startingOffsetString); // add the partition to the proxy proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); }); - System.out.println("#####################started " + this + "; kc=" + kafkaConsumer); // start the proxy thread if (proxy != null && !proxy.isRunning()) { - System.out.println("#####################starting proxy " + proxy); + LOG.info("Starting proxy: " + proxy); proxy.start(); } } @@ -226,29 +219,34 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements private void setFetchThresholds() { // get the thresholds, and set defaults if not defined. KafkaConfig kafkaConfig = new KafkaConfig(config); + Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); long fetchThreshold = FETCH_THRESHOLD; if (fetchThresholdOption.isDefined()) { fetchThreshold = Long.valueOf(fetchThresholdOption.get()); - LOG.info("fetchThresholdOption is defined. fetchThreshold=" + fetchThreshold); + LOG.info("fetchThresholdOption is configured. fetchThreshold=" + fetchThreshold); } + Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; if (fetchThresholdBytesOption.isDefined()) { fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); - LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + fetchThresholdBytes); + LOG.info("fetchThresholdBytesOption is configured. fetchThresholdBytes=" + fetchThresholdBytes); } + + int numTPs = topicPartitions2SSP.size(); + assert (numTPs == topicPartitions2Offset.size()); + LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); - LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" - + topicPartitions2SSP.size()); + LOG.info("number of topicPartitions " + numTPs); - if (topicPartitions2SSP.size() > 0) { - perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size(); + if (numTPs > 0) { + perPartitionFetchThreshold = fetchThreshold / numTPs; LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); if (fetchThresholdBytesEnabled) { // currently this feature cannot be enabled, because we do not have the size of the messages available. // messages get double buffered, hence divide by 2 - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitions2SSP.size(); + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes); } @@ -257,23 +255,22 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements @Override public void stop() { - System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!! stopping "+ "; kc=" + kafkaConsumer); - System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!!TPs = " + topicPartitions2Offset); + LOG.info("Stopping Samza kafkaConsumer " + this); if (!stopped.compareAndSet(false, true)) { LOG.warn("attempting to stop stopped consumer."); return; } - LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this); // stop the proxy (with 5 minutes timeout) if (proxy != null) { - System.out.println("##################### stopping proxy " + proxy); + LOG.info("Stopping proxy " + proxy); proxy.stop(TimeUnit.MINUTES.toMillis(5)); } try { synchronized (kafkaConsumer) { + LOG.info("Closing kafka consumer " + kafkaConsumer); kafkaConsumer.close(); } } catch (Exception e) { @@ -304,7 +301,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements topicPartitions2SSP.put(tp, systemStreamPartition); - LOG.info("============>registering ssp = " + systemStreamPartition + " with offset " + offset + "; kc=" + this); + LOG.info("Registering ssp = " + systemStreamPartition + " with offset " + offset); String existingOffset = topicPartitions2Offset.get(tp); // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. @@ -328,7 +325,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements @Override public String toString() { - return systemName + " " + clientId + "/" + super.toString(); + return systemName + "/" + clientId + "/" + super.toString(); } @Override @@ -339,21 +336,15 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements if (!proxy.isRunning()) { stop(); if (proxy.getFailureCause() != null) { - String message = "LiKafkaConsumerProxy has stopped"; - if (proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) { - message += - " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic"; - } + String message = "KafkaConsumerProxy has stopped"; throw new SamzaException(message, proxy.getFailureCause()); } else { - LOG.warn("Failure cause not populated for LiKafkaConsumerProxy"); + LOG.warn("Failure cause is not populated for KafkaConsumerProxy"); throw new SamzaException("LiKafkaConsumerProxy has stopped"); } } Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); - //LOG.info("=============================>. Res for " + systemStreamPartitions); - //LOG.info("=============================>. Res:" + res.toString()); return res; } @@ -399,14 +390,14 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements } if (fetchThresholdBytesEnabled) { - return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; // TODO Validate + return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; } else { return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; } } void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { - LOG.info("==============>Incoming message ssp = {}: envelope = {}.", ssp, envelope); + LOG.trace("Incoming message ssp = {}: envelope = {}.", ssp, envelope); try { put(ssp, envelope); http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala deleted file mode 100644 index a3f76e7..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ /dev/null @@ -1,437 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.samza.system.kafka - -import java.nio.ByteBuffer -import java.util.concurrent.CountDownLatch - -import kafka.api.{PartitionOffsetsResponse, _} -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet} -import org.apache.kafka.common.protocol.Errors -import org.apache.samza.SamzaException -import org.apache.samza.util.Logging -import org.junit.Assert._ -import org.junit._ -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.mockito.{Matchers, Mockito} - -import scala.collection.JavaConverters._ - -class TestBrokerProxy extends Logging { - /* - val tp2 = new TopicAndPartition("Redbird", 2013) - var fetchTp1 = true // control whether fetching tp1 messages or not - - @Test def brokerProxyRetrievesMessagesCorrectly() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - bp.addTopicPartition(tp, Option("0")) - // Add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(2, sink.receivedMessages.size) - assertEquals(42, sink.receivedMessages(0)._2.offset) - assertEquals(84, sink.receivedMessages(1)._2.offset) - } - - @Test def brokerProxySkipsFetchForEmptyRequests() = { - val (bp, tp, sink) = getMockBrokerProxy() - - bp.start - // Only add tp2, which should never receive messages since sink disables it. - bp.addTopicPartition(tp2, Option("0")) - Thread.sleep(1000) - assertEquals(0, sink.receivedMessages.size) - assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0) - assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount) - } - - @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = { - val (bp, tp, _) = getMockBrokerProxy() - bp.start - bp.addTopicPartition(tp, Option("0")) - - try { - bp.addTopicPartition(tp, Option("1")) - fail("Should have thrown an exception") - } catch { - case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]") - case other: Exception => fail("Got some other exception than what we were expecting: " + other) - } - } - - def getMockBrokerProxy() = { - val sink = new MessageSink { - val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]() - - def abdicate(tp: TopicAndPartition, nextOffset: Long) {} - - def refreshDropped() {} - - def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { - receivedMessages += ((tp, msg, msg.offset.equals(highWatermark))) - } - - def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - } - - // Never need messages for tp2. - def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1 - } - - val system = "daSystem" - val host = "host" - val port = 2222 - val tp = new TopicAndPartition("Redbird", 2012) - val metrics = new KafkaSystemConsumerMetrics(system) - - metrics.registerBrokerProxy(host, port) - metrics.registerTopicAndPartition(tp) - metrics.topicPartitions.get((host, port)).set(1) - - val bp = new BrokerProxy( - host, - port, - system, - "daClientId", - metrics, - sink, - offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) { - - override val sleepMSWhileNoTopicPartitions = 100 - // Speed up for test - var alreadyCreatedConsumer = false - - // Scala traits and Mockito mocks don't mix, unfortunately. - override def createSimpleConsumer() = { - if (alreadyCreatedConsumer) { - System.err.println("Should only be creating one consumer in this test!") - throw new InterruptedException("Should only be creating one consumer in this test!") - } - alreadyCreatedConsumer = true - - new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) { - val sc = Mockito.mock(classOf[SimpleConsumer]) - val mockOffsetResponse = { - val offsetResponse = Mockito.mock(classOf[OffsetResponse]) - val partitionOffsetResponse = { - val por = Mockito.mock(classOf[PartitionOffsetsResponse]) - when(por.offsets).thenReturn(List(1l).toSeq) - por - } - - val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse) - when(offsetResponse.partitionErrorAndOffsets).thenReturn(map) - offsetResponse - } - - when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse) - - val fetchResponse = { - val fetchResponse = Mockito.mock(classOf[FetchResponse]) - - val messageSet = { - val messageSet = Mockito.mock(classOf[ByteBufferMessageSet]) - - def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) - val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84)) - - when(messageSet.sizeInBytes).thenReturn(43) - when(messageSet.size).thenReturn(44) - when(messageSet.iterator).thenReturn(messages.iterator) - when(messageSet.head).thenReturn(messages.head) - messageSet - } - - val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet) - val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData) - - when(fetchResponse.data).thenReturn(map.toSeq) - when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) - fetchResponse - } - when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse) - - override def close() = sc.close() - - override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request) - - override def fetch(request: FetchRequest): FetchResponse = { - // Verify that we only get fetch requests for one tp, even though - // two were registered. This is to verify that - // sink.needsMoreMessages works. - assertEquals(1, request.requestInfo.size) - sc.fetch(request) - } - - when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100) - - override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request) - - override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request) - - override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request) - - override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId) - } - } - - } - - (bp, tp, sink) - } - - @Test def brokerProxyUpdateLatencyMetrics() = { - val (bp, tp, _) = getMockBrokerProxy() - - bp.start - bp.addTopicPartition(tp, Option("0")) - Thread.sleep(1000) - // update when fetching messages - assertEquals(500, bp.metrics.highWatermark.get(tp).getValue) - assertEquals(415, bp.metrics.lag.get(tp).getValue) - - fetchTp1 = false - Thread.sleep(1000) - // update when not fetching messages - assertEquals(100, bp.metrics.highWatermark.get(tp).getValue) - assertEquals(15, bp.metrics.lag.get(tp).getValue) - - fetchTp1 = true - } - - @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = { - // Need to wait for the thread to do some work before ending the test - val countdownLatch = new CountDownLatch(1) - var failString: String = null - - val mockMessageSink = mock(classOf[MessageSink]) - when(mockMessageSink.needsMoreMessages(any())).thenReturn(true) - - val doNothingMetrics = new KafkaSystemConsumerMetrics() - - val tp = new TopicAndPartition("topic", 42) - - val mockOffsetGetter = mock(classOf[GetOffset]) - // This will be used by the simple consumer below, and this is the response that simple consumer needs - when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) - when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) - - var callsToCreateSimpleConsumer = 0 - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - // Create an answer that first indicates offset out of range on first invocation and on second - // verifies that the parameters have been updated to what we expect them to be - val answer = new Answer[FetchResponse]() { - var invocationCount = 0 - - def answer(invocation: InvocationOnMock): FetchResponse = { - val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)] - - if (invocationCount == 0) { - if (arguments !=(tp, 0)) { - failString = "First invocation did not have the right arguments: " + arguments - countdownLatch.countDown() - } - val mfr = mock(classOf[FetchResponse]) - when(mfr.hasError).thenReturn(true) - when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE) - - val messageSet = mock(classOf[MessageSet]) - when(messageSet.iterator).thenReturn(Iterator.empty) - val response = mock(classOf[FetchResponsePartitionData]) - when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE) - val responseMap = Map(tp -> response) - when(mfr.data).thenReturn(responseMap.toSeq) - invocationCount += 1 - mfr - } else { - if (arguments !=(tp, 1492)) { - failString = "On second invocation, arguments were not correct: " + arguments - } - countdownLatch.countDown() - Thread.currentThread().interrupt() - null - } - } - } - - when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer) - - // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset - - val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - - override def createSimpleConsumer() = { - if (callsToCreateSimpleConsumer > 1) { - failString = "Tried to create more than one simple consumer" - countdownLatch.countDown() - } - callsToCreateSimpleConsumer += 1 - mockSimpleConsumer - } - } - - bp.addTopicPartition(tp, Option("0")) - bp.start - countdownLatch.await() - bp.stop - if (failString != null) { - fail(failString) - } - } - - /** - * TODO fix - * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions - * that it owns when a consumer failure occurs. - */ - @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = { - val countdownLatch = new CountDownLatch(1) - var abdicated: Option[TopicAndPartition] = None - @volatile var refreshDroppedCount = 0 - val mockMessageSink = new MessageSink { - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - } - - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) { - } - - override def abdicate(tp: TopicAndPartition, nextOffset: Long) { - abdicated = Some(tp) - countdownLatch.countDown - } - - override def refreshDropped() { - refreshDroppedCount += 1 - } - - override def needsMoreMessages(tp: TopicAndPartition): Boolean = { - true - } - } - - val doNothingMetrics = new KafkaSystemConsumerMetrics() - val tp = new TopicAndPartition("topic", 42) - val mockOffsetGetter = mock(classOf[GetOffset]) - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true) - when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l) - when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that.")) - - val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - - val waitForRefresh = () => { - val currentRefreshDroppedCount = refreshDroppedCount - while (refreshDroppedCount == currentRefreshDroppedCount) { - Thread.sleep(100) - } - } - - bp.addTopicPartition(tp, Option("0")) - bp.start - // BP should refresh on startup. - waitForRefresh() - countdownLatch.await() - // BP should continue refreshing after it's abdicated all TopicAndPartitions. - waitForRefresh() - bp.stop - assertEquals(tp, abdicated.getOrElse(null)) - } - - @Test def brokerProxyAbdicatesHardErrors(): Unit = { - val doNothingMetrics = new KafkaSystemConsumerMetrics - val mockMessageSink = new MessageSink { - override def needsMoreMessages(tp: TopicAndPartition): Boolean = true - override def abdicate(tp: TopicAndPartition, nextOffset: Long) {} - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {} - override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")} - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} - } - val mockOffsetGetter = mock(classOf[GetOffset]) - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - - val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - var caughtError = false - try { - bp.thread.run - } catch { - case e: SamzaException => { - assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.") - info("Received OutOfMemoryError in broker proxy.") - caughtError = true - } - } - assertEquals(true, caughtError) - val mockMessageSink2 = new MessageSink { - override def needsMoreMessages(tp: TopicAndPartition): Boolean = true - override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {} - override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {} - override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")} - override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {} - } - caughtError = false - val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - try { - bp2.thread.run - } catch { - case e: SamzaException => { - assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.") - info("Received StackOverflowError in broker proxy.") - caughtError = true - } - } - assertEquals(true, caughtError) - } - - @Test - def brokerProxyStopCloseConsumer: Unit = { - val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer]) - val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){ - override def createSimpleConsumer() = { - mockSimpleConsumer - } - } - bp.start - bp.stop - verify(mockSimpleConsumer).close - } - */ -} http://git-wip-us.apache.org/repos/asf/samza/blob/332a0481/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 2ea9a5f..8405c63 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -223,16 +223,16 @@ class StreamTaskTestUtil { * interrupt, which is forwarded on to ThreadJob, and marked as a failure). */ def stopJob(job: StreamJob) { - // make sure we don't kill the job before it was started + // make sure we don't kill the job before it was started. + // eventProcesses guarantees all the consumers have been initialized val tasks = TestTask.tasks val task = tasks.values.toList.head task.eventProcessed.await(60, TimeUnit.SECONDS) - System.out.println("THREAD: JOB KILL BEFORE") + assertEquals(0, task.eventProcessed.getCount) + // Shutdown task. job.kill - System.out.println("THREAD: JOB KILL") val status = job.waitForFinish(60000) - System.out.println("THREAD: JOB KILL WAIT") assertEquals(ApplicationStatus.UnsuccessfulFinish, status) }
