Added new KafkaProxy and KafkaConsumer for default KafkaSystem
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72544606 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72544606 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72544606 Branch: refs/heads/NewKafkaSystemConsumer Commit: 72544606bfffc67aeaa7f509ca54cfd6db52e2b4 Parents: 4801709 Author: Boris S <[email protected]> Authored: Fri Aug 17 18:08:52 2018 -0700 Committer: Boris S <[email protected]> Committed: Fri Aug 17 18:08:52 2018 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerConfig.java | 152 ++++++ .../samza/system/kafka/KafkaConsumerProxy.java | 463 +++++++++++++++++++ .../samza/system/kafka/KafkaSystemFactory.scala | 54 ++- .../system/kafka/NewKafkaSystemConsumer.java | 403 ++++++++++++++++ .../kafka/TestKafkaCheckpointManager.scala | 8 +- 5 files changed, 1064 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java new file mode 100644 index 0000000..97360e2 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.kafka.clients.consumer; + +import java.util.Map; +import java.util.Properties; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.JobConfig; +import scala.Option; + + +/** + * The configuration class for KafkaConsumer + */ +public class KafkaConsumerConfig extends ConsumerConfig { + + private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; + private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; + private static final String SAMZA_OFFSET_LARGEST = "largest"; + private static final String SAMZA_OFFSET_SMALLEST = "smallest"; + private static final String KAFKA_OFFSET_LATEST = "latest"; + private static final String KAFKA_OFFSET_EARLIEST = "earliest"; + /* + * By default, KafkaConsumer will fetch ALL available messages for all the partitions. + * This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll(). + */ + private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT = "100"; + + + public KafkaConsumerConfig(Properties props) { + super(props); + } + + public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, + String systemName, String clientId, Map<String, String> injectProps) { + + Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true); + + String groupId = getConsumerGroupId(config); + + Properties consumerProps = new Properties(); + consumerProps.putAll(subConf); + + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + + /******************************************** + * Open-source Kafka Consumer configuration * + *******************************************/ + consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable consumer auto-commit + + consumerProps.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + getAutoOffsetResetValue(consumerProps)); // Translate samza config value to kafka config value + + // makesure bootstrap configs are in ?? SHOULD WE FAIL IF THEY ARE NOT? + if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + // get it from the producer config + String bootstrapServer = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + } + + // Always use default partition assignment strategy. Do not allow override. + consumerProps.setProperty( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + RangeAssignor.class.getName()); + + + // NOT SURE THIS IS NEEDED TODO + String maxPollRecords = subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);; + consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); + + // put overrides + consumerProps.putAll(injectProps); + + return new KafkaConsumerConfig(consumerProps); + } + + // group id should be unique per job + static String getConsumerGroupId(Config config) { + JobConfig jobConfig = new JobConfig(config); + Option<String> jobIdOption = jobConfig.getJobId(); + Option<String> jobNameOption = jobConfig.getName(); + return (jobNameOption.isDefined()? jobNameOption.get() : "undefined_job_name") + "-" + + (jobIdOption.isDefined()? jobIdOption.get() : "undefined_job_id"); + } + // client id should be unique per job + public static String getClientId(String id, Config config) { + if (config.get(JobConfig.JOB_NAME()) == null) { + throw new ConfigException("Missing job name"); + } + String jobName = config.get(JobConfig.JOB_NAME()); + String jobId = "1"; + if (config.get(JobConfig.JOB_ID()) != null) { + jobId = config.get(JobConfig.JOB_ID()); + } + return getClientId(id, jobName, jobId); + } + + private static String getClientId(String id, String jobName, String jobId) { + return String.format( + "%s-%s-%s", + id.replaceAll("[^A-Za-z0-9]", "_"), + jobName.replaceAll("[^A-Za-z0-9]", "_"), + jobId.replaceAll("[^A-Za-z0-9]", "_")); + } + + public static String getProducerClientId(Config config) { + return getClientId(PRODUCER_CLIENT_ID_PREFIX, config); + } + + /** + * Settings for auto.reset in samza are different from settings in Kafka (auto.offset.reset) - need to convert + * "largest" -> "latest" + * "smallest" -> "earliest" + * "none" - will fail the kafka consumer, if offset is out of range + * @param properties All consumer related {@link Properties} parsed from samza config + * @return String representing the config value for "auto.offset.reset" property + */ + static String getAutoOffsetResetValue(Properties properties) { + String autoOffsetReset = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_LATEST); + switch (autoOffsetReset) { + case SAMZA_OFFSET_LARGEST: + return KAFKA_OFFSET_LATEST; + case SAMZA_OFFSET_SMALLEST: + return KAFKA_OFFSET_EARLIEST; + default: + return KAFKA_OFFSET_LATEST; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java new file mode 100644 index 0000000..66971af --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -0,0 +1,463 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import kafka.common.KafkaException; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.SamzaException; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Separate thread that reads messages from kafka and puts them int the BlockingEnvelopeMap + * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object + * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. + */ +public class KafkaConsumerProxy<K, V> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class); + + private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100; + + /* package private */ final Thread consumerPollThread; + private final Consumer<K, V> kafkaConsumer; + private final NewKafkaSystemConsumer.KafkaConsumerMessageSink sink; + private final KafkaSystemConsumerMetrics kafkaConsumerMetrics; + private final String metricName; + private final String systemName; + private final String clientId; + private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new HashMap<>(); + // list of all the SSPs we poll from with their next offsets correspondingly. + private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); + // lags behind the high water mark, as reported by the Kafka consumer. + private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>(); + private final NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper; + + private volatile boolean isRunning = false; + private volatile Throwable failureCause = null; + private CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); + + public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, + NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, + String metricName, NewKafkaSystemConsumer.ValueUnwrapper<V> valueUnwrapper) { + + this.kafkaConsumer = kafkaConsumer; + this.systemName = systemName; + this.sink = messageSink; + this.kafkaConsumerMetrics = samzaConsumerMetrics; + this.metricName = metricName; + this.clientId = clientId; + this.valueUnwrapper = valueUnwrapper; + + // TODO - see if we need new metrics (not host:port based) + this.kafkaConsumerMetrics.registerBrokerProxy(metricName, 0); + + consumerPollThread = new Thread(createProxyThreadRunnable()); + } + + public void start() { + if (!consumerPollThread.isAlive()) { + LOG.info("Starting LiKafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); + consumerPollThread.setDaemon(true); + consumerPollThread.setName( + "Samza LiKafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); + consumerPollThread.start(); + + // we need to wait until the thread starts + while (!isRunning) { + try { + consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + } + } else { + LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString()); + } + } + + // add new partition to the list of polled partitions + // this method is called only at the beginning, before the thread is started + public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) { + LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset, + this)); + topicPartitions2SSP.put(NewKafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs + + // this is already vetted offset so there is no need to validate it + LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp)); + + nextOffsets.put(ssp, nextOffset); + + // we reuse existing metrics. They assume host and port for the broker + // for now fake the port with the consumer name + kafkaConsumerMetrics.setTopicPartitionValue(metricName, 0, nextOffsets.size()); + } + + /** + * creates a separate thread for pulling messages + */ + private Runnable createProxyThreadRunnable() { + return () -> { + isRunning = true; + + try { + consumerPollThreadStartLatch.countDown(); + initializeLags(); + while (isRunning) { + fetchMessages(); + } + } catch (Throwable throwable) { + LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for system: %s.", systemName), throwable); + // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the throwable to the container + failureCause = throwable; + isRunning = false; + } + + if (!isRunning) { + LOG.info("Stopping the LiKafkaConsumerProxy poll thread for system: {}.", systemName); + } + }; + } + + private void initializeLags() { + // This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag. + Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet()); + endOffsets.forEach((tp, offset) -> { + SystemStreamPartition ssp = topicPartitions2SSP.get(tp); + long startingOffset = nextOffsets.get(ssp); + // End offsets are the offset of the newest message + 1 + // If the message we are about to consume is < end offset, we are starting with a lag. + long initialLag = endOffsets.get(tp) - startingOffset; + + LOG.info("Initial lag is {} for SSP {}", initialLag, ssp); + latestLags.put(ssp, initialLag); + sink.setIsAtHighWatermark(ssp, initialLag == 0); + }); + + // initialize lag metrics + refreshLatencyMetrics(); + } + + // the actual polling of the messages from kafka + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) { + + if (topicPartitions2SSP.size() == 0) { + throw new SamzaException("cannot poll empty set of TopicPartitions"); + } + + // Since we need to poll only from some subset of TopicPartitions (passed as the argument), + // we need to pause the rest. + List<TopicPartition> topicPartitionsToPause = new ArrayList<>(); + List<TopicPartition> topicPartitionsToPoll = new ArrayList<>(); + + for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitions2SSP.entrySet()) { + TopicPartition tp = e.getKey(); + SystemStreamPartition ssp = e.getValue(); + if (systemStreamPartitions.contains(ssp)) { + topicPartitionsToPoll.add(tp); // consume + } else { + topicPartitionsToPause.add(tp); // ignore + } + } + + ConsumerRecords<K, V> records; + // make a call on the client + try { + // Currently, when doing checkpoint we are making a safeOffset request through this client, thus we need to synchronize + // them. In the future we may use this client for the actually checkpointing. + synchronized (kafkaConsumer) { + // Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily + kafkaConsumer.pause(topicPartitionsToPause); + kafkaConsumer.resume(topicPartitionsToPoll); + records = kafkaConsumer.poll(timeout); + // resume original set of subscription - may be required for checkpointing + kafkaConsumer.resume(topicPartitionsToPause); + } + } catch (InvalidOffsetException e) { + LOG.error("LiKafkaConsumer with invalidOffsetException", e); + // If the consumer has thrown this exception it means that auto reset is not set for this consumer. + // So we just rethrow. + LOG.error("Caught InvalidOffsetException in pollConsumer", e); + throw e; + } catch (KafkaException e) { + // we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions, + // but we still just rethrow, and log it up the stack. + LOG.error("Caught a Kafka exception in pollConsumer", e); + throw e; + } + + return processResults(records); + } + + private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) { + if (records == null) { + return Collections.emptyMap(); + } + + int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity. + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity); + // Parse the returned records and convert them into the IncomingMessageEnvelope. + // Note. They have been already de-serialized by the consumer. + for (ConsumerRecord<K, V> r : records) { + int partition = r.partition(); + String topic = r.topic(); + TopicPartition tp = new TopicPartition(topic, partition); + + updateMetrics(r, tp); + + SystemStreamPartition ssp = topicPartitions2SSP.get(tp); + List<IncomingMessageEnvelope> listMsgs = results.get(ssp); + if (listMsgs == null) { + listMsgs = new ArrayList<>(); + results.put(ssp, listMsgs); + } + + // TODO - add calculation of the size of the message, when available from Kafka + int msgSize = 0; + // if (fetchLimitByBytesEnabled) { + msgSize = getRecordSize(r); + //} + + final K key = r.key(); + final Object value = + valueUnwrapper == null ? r.value() : valueUnwrapper.unwrapValue(ssp.getSystemStream(), r.value()); + IncomingMessageEnvelope imEnvelope = + new IncomingMessageEnvelope(ssp, String.valueOf(r.offset()), key, value, msgSize); + listMsgs.add(imEnvelope); + } + if (LOG.isDebugEnabled()) { + LOG.debug("# records per SSP:"); + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : results.entrySet()) { + List<IncomingMessageEnvelope> list = e.getValue(); + LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size())); + } + } + + return results; + } + + private int getRecordSize(ConsumerRecord<K, V> r) { + int keySize = 0; //(r.key() == null) ? 0 : r.key().getSerializedKeySize(); + return keySize; // + r.getSerializedMsgSize(); // TODO -enable when functionality available from Kafka + + //int getMessageSize (Message message) { + // Approximate additional shallow heap overhead per message in addition to the raw bytes + // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead. + // As this overhead is a moving target, and not very large + // compared to the message size its being ignore in the computation for now. + // int MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4; + + // return message.size() + MESSAGE_SIZE_OVERHEAD; + // } + } + + private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) { + TopicAndPartition tap = NewKafkaSystemConsumer.toTopicAndPartition(tp); + SystemStreamPartition ssp = NewKafkaSystemConsumer.toSystemStreamPartition(systemName, tap); + long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark + if (currentSSPLag < 0) { + return; + } + long recordOffset = r.offset(); + long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark + + int size = getRecordSize(r); + kafkaConsumerMetrics.incReads(tap); + kafkaConsumerMetrics.incBytesReads(tap, size); + kafkaConsumerMetrics.setOffsets(tap, recordOffset); + kafkaConsumerMetrics.incBrokerBytesReads(metricName, 0, size); + kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark); + } + + /* + This method put messages into blockingEnvelopeMap. + */ + private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) { + long nextOffset = nextOffsets.get(ssp); + + for (IncomingMessageEnvelope env : envelopes) { + sink.addMessage(ssp, env); // move message to the BlockingEnvelopeMap's queue + + LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for ssp={}", env.getOffset(), ssp); + nextOffset = Long.valueOf(env.getOffset()) + 1; + } + + nextOffsets.put(ssp, nextOffset); + } + + private void populateMetricNames(Set<SystemStreamPartition> ssps) { + HashMap<String, String> tags = new HashMap<>(); + tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics + + for (SystemStreamPartition ssp : ssps) { + TopicPartition tp = NewKafkaSystemConsumer.toTopicPartition(ssp); + ssp2MetricName.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags)); + } + } + + /* + The only way to figure out lag for the LiKafkaConsumer is to look at the metrics after each poll() call. + One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is. + This method populates the lag information for each SSP into latestLags member variable. + */ + private void populateCurrentLags(Set<SystemStreamPartition> ssps) { + + Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics(); + + // populate the MetricNames first time + if (ssp2MetricName.isEmpty()) { + populateMetricNames(ssps); + } + + for (SystemStreamPartition ssp : ssps) { + MetricName mn = ssp2MetricName.get(ssp); + Metric currentLagM = consumerMetrics.get(mn); + + // In linkedin-kafka-client 5.*, high watermark is fixed to be the offset of last available message, + // so the lag is now at least 0, which is the same as Samza's definition. + // If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling. + long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L; + /* + Metric averageLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags)); + double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0; + Metric maxLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags)); + double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0; + */ + latestLags.put(ssp, currentLag); + + // calls the setIsAtHead for the BlockingEnvelopeMap + sink.setIsAtHighWatermark(ssp, currentLag == 0); + } + } + + /* + Get the latest lag for a specific SSP. + */ + public long getLatestLag(SystemStreamPartition ssp) { + Long lag = latestLags.get(ssp); + if (lag == null) { + throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp); + } + return lag; + } + + /* + Using the consumer to poll the messages from the stream. + */ + private void fetchMessages() { + Set<SystemStreamPartition> SSPsToFetch = new HashSet<>(); + for (SystemStreamPartition ssp : nextOffsets.keySet()) { + if (sink.needsMoreMessages(ssp)) { + SSPsToFetch.add(ssp); + } + } + LOG.debug("pollConsumer {}", SSPsToFetch.size()); + if (!SSPsToFetch.isEmpty()) { + kafkaConsumerMetrics.incBrokerReads(metricName, 0); + + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response; + if (LOG.isDebugEnabled()) { + LOG.debug("pollConsumer from following SSPs: {}; total#={}", SSPsToFetch, SSPsToFetch.size()); + } + response = pollConsumer(SSPsToFetch, 500); // TODO should be default value from ConsumerConfig + + // move the responses into the queue + for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) { + List<IncomingMessageEnvelope> envelopes = e.getValue(); + if (envelopes != null) { + moveMessagesToTheirQueue(e.getKey(), envelopes); + } + } + + populateCurrentLags(SSPsToFetch); // find current lags for for each SSP + } else { // nothing to read + + LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer, + SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + + kafkaConsumerMetrics.incBrokerSkippedFetchRequests(metricName, 0); + + try { + Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION); + } catch (InterruptedException e) { + LOG.warn("Sleep in fetchMessages was interrupted"); + } + } + refreshLatencyMetrics(); + } + + private void refreshLatencyMetrics() { + for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) { + SystemStreamPartition ssp = e.getKey(); + Long offset = e.getValue(); + TopicAndPartition tp = NewKafkaSystemConsumer.toTopicAndPartition(ssp); + Long lag = latestLags.get(ssp); + LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag); + if (lag != null && offset != null && lag >= 0) { + long streamEndOffset = offset.longValue() + lag.longValue(); + // update the metrics + kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset); + kafkaConsumerMetrics.setLagValue(tp, lag.longValue()); + } + } + } + + boolean isRunning() { + return isRunning; + } + + Throwable getFailureCause() { + return failureCause; + } + + public void stop(long timeout) { + LOG.info("Shutting down LiKafkaConsumerProxy poll thread:" + toString()); + + isRunning = false; + try { + consumerPollThread.join(timeout); + } catch (InterruptedException e) { + LOG.warn("Join in LiKafkaConsumerProxy has failed", e); + consumerPollThread.interrupt(); + } + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 9f0b5f2..c7f6aed 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -19,16 +19,21 @@ package org.apache.samza.system.kafka +import java.util import java.util.Properties + +import kafka.consumer.ConsumerConfig import kafka.utils.ZkUtils +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode -import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore} -import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config} +import org.apache.samza.util._ +import org.apache.samza.config.{ApplicationConfig, Config, KafkaConfig, StreamConfig} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.config.TaskConfig.Config2Task import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ import org.apache.samza.system.SystemProducer @@ -53,21 +58,35 @@ class KafkaSystemFactory extends SystemFactory with Logging { // Kind of goofy to need a producer config for consumers, but we need metadata. val producerConfig = config.getKafkaSystemProducerConfig(systemName, clientId) val bootstrapServers = producerConfig.bootsrapServers - val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) + //val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) - val timeout = consumerConfig.socketTimeoutMs - val bufferSize = consumerConfig.socketReceiveBufferBytes - val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName)) - val consumerMinSize = consumerConfig.fetchMinBytes - val consumerMaxWait = consumerConfig.fetchWaitMaxMs - val autoOffsetResetDefault = consumerConfig.autoOffsetReset + //val kafkaConfig = new KafkaConfig(config) + + + // val timeout = consumerConfig.socketTimeoutMs + //val bufferSize = consumerConfig.socketReceiveBufferBytes + //val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName)) + //val consumerMinSize = consumerConfig.fetchMinBytes + //val consumerMaxWait = consumerConfig.fetchWaitMaxMs + //val autoOffsetResetDefault = consumerConfig.autoOffsetReset val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong - val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout) + //val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) + //val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout) - new KafkaSystemConsumer( + + val kafkaConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = + NewKafkaSystemConsumer.getKafkaConsumerImpl(systemName, clientId, config) + + def valueUnwrapper: NewKafkaSystemConsumer.ValueUnwrapper[Array[Byte]] = null;// TODO add real unrapper from + val kc = new NewKafkaSystemConsumer ( + kafkaConsumer, systemName, config, clientId, + metrics, new SystemClock, false, valueUnwrapper) + + kc + /* + new KafkaSystemConsumer( systemName = systemName, systemAdmin = getAdmin(systemName, config), metrics = metrics, @@ -82,7 +101,18 @@ class KafkaSystemFactory extends SystemFactory with Logging { fetchThresholdBytes = fetchThresholdBytes, fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName), offsetGetter = offsetGetter) + */ + } + + /* + def getKafkaConsumerImpl(systemName: String, config: KafkaConfig) = { + info("Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, config) + + val byteArrayDeserializer = new ByteArrayDeserializer + new KafkaConsumer[Array[Byte], Array[Byte]](config.configForVanillaConsumer(), + byteArrayDeserializer, byteArrayDeserializer) } + */ def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { val clientId = KafkaUtil.getClientId("samza-producer", config) http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java new file mode 100644 index 0000000..26db610 --- /dev/null +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -0,0 +1,403 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + + +public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements SystemConsumer{ + + private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class); + + /** + * Provides a way to unwrap the value further. It is used for intermediate stream messages. + * @param <T> value type + */ + public interface ValueUnwrapper<T> { + Object unwrapValue(SystemStream systemStream, T value); + } + + private static final long FETCH_THRESHOLD = 50000; + private static final long FETCH_THRESHOLD_BYTES = -1L; + private final Consumer<K,V> kafkaConsumer; + private final String systemName; + private final KafkaSystemConsumerMetrics samzaConsumerMetrics; + private final String clientId; + private final String metricName; + private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + private final AtomicBoolean stopped = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private final Config config; + private final boolean fetchThresholdBytesEnabled; + private final ValueUnwrapper<V> valueUnwrapper; + + // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. + private KafkaConsumerMessageSink messageSink; + // proxy is doing the actual reading + private KafkaConsumerProxy proxy; + + /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); + /* package private */long perPartitionFetchThreshold; + /* package private */long perPartitionFetchThresholdBytes; + + // TODO - consider new class for KafkaSystemConsumerMetrics + + /** + * @param systemName + * @param config + * @param metrics + */ + public NewKafkaSystemConsumer( + Consumer<K,V> kafkaConsumer, + String systemName, + Config config, + String clientId, + KafkaSystemConsumerMetrics metrics, + Clock clock, + boolean fetchThresholdBytesEnabled, + ValueUnwrapper<V> valueUnwrapper) { + + super(metrics.registry(),clock, metrics.getClass().getName()); + + this.samzaConsumerMetrics = metrics; + this.clientId = clientId; + this.systemName = systemName; + this.config = config; + this.fetchThresholdBytesEnabled = fetchThresholdBytesEnabled; + this.metricName = systemName + " " + clientId; + + this.kafkaConsumer = kafkaConsumer; + this.valueUnwrapper = valueUnwrapper; + + LOG.info(String.format( + "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with liKafkaConsumer=%s", + systemName, clientId, metricName, this.kafkaConsumer.toString())); + } + + public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { + + Map<String, String> injectProps = new HashMap<>(); + injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); + + LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, consumerConfig.originals()); + /* + Map<String, Object> kafkaConsumerConfig = consumerConfig.originals().entrySet().stream() + .collect(Collectors.toMap((kv)->kv.getKey(), (kv)->(Object)kv.getValue())); +*/ + + return new KafkaConsumer<byte[], byte[]>(consumerConfig.originals()); + } + + /** + * return system name for this consumer + * @return system name + */ + public String getSystemName() { + return systemName; + } + + @Override + public void start() { + if (!started.compareAndSet(false, true)) { + LOG.warn("attempting to start the consumer for the second (or more) time."); + return; + } + if(stopped.get()) { + LOG.warn("attempting to start a stopped consumer"); + return; + } +LOG.info("==============>About to start consumer"); + // initialize the subscriptions for all the registered TopicPartitions + startSubscription(); + LOG.info("==============>subscription started"); + // needs to be called after all the registrations are completed + setFetchThresholds(); + LOG.info("==============>thresholds ste"); + // Create the proxy to do the actual message reading. It is a separate thread that reads the messages from the stream + // and puts them into the sink. + createConsumerProxy(); + LOG.info("==============>proxy started"); + startConsumer(); + LOG.info("==============>consumer started"); + } + + private void startSubscription() { + //subscribe to all the TopicPartitions + LOG.info("==============>startSubscription for TP: " + topicPartitions2SSP.keySet()); + try { + synchronized (kafkaConsumer) { + // we are using assign (and not subscribe), so we need to specify both topic and partition + //topicPartitions2SSP.put(new TopicPartition("FAKE PARTITION", 0), new SystemStreamPartition("Some","Another", new Partition(0))); + //topicPartitions2Offset.put(new TopicPartition("FAKE PARTITION", 0), "1234"); + kafkaConsumer.assign(topicPartitions2SSP.keySet()); + } + } catch (Exception e) { + LOG.warn("startSubscription failed.", e); + throw new SamzaException(e); + } + } + + private void createConsumerProxy() { + // create a sink for passing the messages between the proxy and the consumer + messageSink = new KafkaConsumerMessageSink(); + + // create the thread with the consumer + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, + samzaConsumerMetrics, metricName, valueUnwrapper); + + LOG.info("==============>Created consumer proxy: " + proxy); + } + + /* + Set the offsets to start from. + Add the TopicPartitions to the proxy. + Start the proxy thread. + */ + private void startConsumer() { + //set the offset for each TopicPartition + topicPartitions2Offset.forEach((tp, startingOffsetString) -> { + long startingOffset = Long.valueOf(startingOffsetString); + + try { + synchronized (kafkaConsumer) { + // TODO in the future we may need to add special handling here for BEGIN/END_OFFSET + // this will call liKafkaConsumer.seekToBegin/End() + kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value + } + } catch (Exception e) { + // all other exceptions - non recoverable + LOG.error("Got Exception while seeking to " + startingOffsetString + " for " + tp, e); + throw new SamzaException(e); + } + + LOG.info("==============>Changing Consumer's position for tp = " + tp + " to " + startingOffsetString); + + // add the partition to the proxy + proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); + }); + + // start the proxy thread + if (proxy != null && !proxy.isRunning()) { + proxy.start(); + } + } + + private void setFetchThresholds() { + // get the thresholds, and set defaults if not defined. + KafkaConfig kafkaConfig = new KafkaConfig(config); + Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); + long fetchThreshold = FETCH_THRESHOLD; + if(fetchThresholdOption.isDefined()) { + fetchThreshold = Long.valueOf(fetchThresholdOption.get()); + LOG.info("fetchThresholdOption is defined. fetchThreshold=" + fetchThreshold); + } + Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); + long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; + if(fetchThresholdBytesOption.isDefined()) { + fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); + LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + fetchThresholdBytes); + } + LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); + LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" + topicPartitions2SSP.size()); + + if (topicPartitions2SSP.size() > 0) { + perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size(); + LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); + if(fetchThresholdBytesEnabled) { + // currently this feature cannot be enabled, because we do not have the size of the messages available. + // messages get double buffered, hence divide by 2 + perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitions2SSP.size(); + LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes); + } + } + } + + @Override + public void stop() { + if (!stopped.compareAndSet(false, true)) { + LOG.warn("attempting to stop stopped consumer."); + return; + } + + LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this); + // stop the proxy (with 5 minutes timeout) + if(proxy != null) + proxy.stop(TimeUnit.MINUTES.toMillis(5)); + + try { + synchronized (kafkaConsumer) { + kafkaConsumer.close(); + } + } catch (Exception e) { + LOG.warn("failed to stop SamzaRawLiKafkaConsumer + " + this, e); + } + } + + /* + record the ssp and the offset. Do not submit it to the consumer yet. + */ + @Override + public void register(SystemStreamPartition systemStreamPartition, String offset) { + if (!systemStreamPartition.getSystem().equals(systemName)) { + LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName); + return; + } + super.register(systemStreamPartition, offset); + + TopicPartition tp = toTopicPartition(systemStreamPartition); + + topicPartitions2SSP.put(tp, systemStreamPartition); + + LOG.info("==============>registering ssp = " + systemStreamPartition + " with offset " + offset); + + String existingOffset = topicPartitions2Offset.get(tp); + // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. + if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { + topicPartitions2Offset.put(tp, offset); + } + + samzaConsumerMetrics.registerTopicAndPartition(toTopicAndPartition(tp)); + } + + /** + * Compare two String offsets. + * Note. There is a method in KafkaAdmin that does that, but that would require instantiation of systemadmin for each consumer. + * @param off1 + * @param off2 + * @return see {@link Long#compareTo(Long)} + */ + public static int compareOffsets(String off1, String off2) { + return Long.valueOf(off1).compareTo(Long.valueOf(off2)); + } + + @Override + public String toString() { + return systemName + " " + clientId + "/" + super.toString(); + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) + throws InterruptedException { + + // check if the proxy is running + if(!proxy.isRunning()) { + stop(); + if (proxy.getFailureCause() != null) { + String message = "LiKafkaConsumerProxy has stopped"; + if(proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) + message += " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic"; + throw new SamzaException(message, proxy.getFailureCause()); + } else { + LOG.warn("Failure cause not populated for LiKafkaConsumerProxy"); + throw new SamzaException("LiKafkaConsumerProxy has stopped"); + } + } + + return super.poll(systemStreamPartitions, timeout); + } + + public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { + return new TopicAndPartition(tp.topic(), tp.partition()); + } + + public static TopicAndPartition toTopicAndPartition(SystemStreamPartition ssp) { + return new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + } + + public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { + return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); + } + + public static SystemStreamPartition toSystemStreamPartition(String systemName, TopicAndPartition tp) { + return new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition())); + } + + //////////////////////////////////// + // inner class for the message sink + //////////////////////////////////// + public class KafkaConsumerMessageSink { + + public void setIsAtHighWatermark(SystemStreamPartition ssp, boolean isAtHighWatermark) { + setIsAtHead(ssp, isAtHighWatermark); + } + + boolean needsMoreMessages(SystemStreamPartition ssp) { + if(LOG.isDebugEnabled()) { + LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, + getNumMessagesInQueue(ssp), perPartitionFetchThreshold); + } + + if (fetchThresholdBytesEnabled) { + return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; // TODO Validate + } else { + return getNumMessagesInQueue(ssp) < perPartitionFetchThreshold; + } + } + + void addMessage(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { + LOG.info("==============>Incoming message ssp = {}: envelope = {}.", ssp, envelope); + + try { + put(ssp, envelope); + } catch (InterruptedException e) { + throw new SamzaException( + String.format("Interrupted while trying to add message with offset %s for ssp %s", + envelope.getOffset(), + ssp)); + } + } + } // end of KafkaMessageSink class + /////////////////////////////////////////////////////////////////////////// +} http://git-wip-us.apache.org/repos/asf/samza/blob/72544606/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 065170c..8544dbf 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -88,12 +88,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { zkClient.close // read before topic exists should result in a null checkpoint - //val readCp = readCheckpoint(checkpointTopic, taskName) - //assertNull(readCp) + val readCp = readCheckpoint(checkpointTopic, taskName) + assertNull(readCp) writeCheckpoint(checkpointTopic, taskName, checkpoint1) + assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName)) -try {Thread.sleep(20000)} catch { case e:Exception =>() } // writing a second message and reading it returns a more recent checkpoint writeCheckpoint(checkpointTopic, taskName, checkpoint2) assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName)) @@ -194,7 +194,7 @@ try {Thread.sleep(20000)} catch { case e:Exception =>() } val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props) - System.out.println("CONFIG:" + config) + System.out.println("CONFIG = " + config) new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde) }
