Repository: samza Updated Branches: refs/heads/master 6d20ee7e4 -> a8a8dc78d
SAMZA-1888: Kafka consumer improvements Author: Boris S <bshkol...@linkedin.com> Author: Boris S <bor...@apache.org> Author: Boris Shkolnik <bshko...@linkedin.com> Reviewers: bharathkk <codin.mart...@gmail.com> Closes #738 from sborya/KafkaConsumerImprovements Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a8a8dc78 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a8a8dc78 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a8a8dc78 Branch: refs/heads/master Commit: a8a8dc78d6aa375ab2220f8fcf3998d3f012d27d Parents: 6d20ee7 Author: Boris S <bshkol...@linkedin.com> Authored: Wed Oct 17 16:38:19 2018 -0700 Committer: Boris S <bshkol...@linkedin.com> Committed: Wed Oct 17 16:38:19 2018 -0700 ---------------------------------------------------------------------- .../samza/system/kafka/KafkaSystemAdmin.java | 90 ++++++++------ .../samza/system/kafka/KafkaSystemConsumer.java | 17 ++- .../kafka/DefaultFetchSimpleConsumer.scala | 66 ----------- .../apache/samza/system/kafka/GetOffset.scala | 116 ------------------- .../samza/system/kafka/TestGetOffset.scala | 110 ------------------ 5 files changed, 60 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index f761ab3..d2ceafb 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -80,9 +80,9 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { protected final String systemName; protected final Consumer metadataConsumer; + protected final Config config; - // get ZkUtils object to connect to Kafka's ZK. - private final Supplier<ZkUtils> getZkConnection; + protected AdminClient adminClient = null; // Custom properties to create a new coordinator stream. private final Properties coordinatorStreamProperties; @@ -96,16 +96,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { // Kafka properties for intermediate topics creation private final Map<String, Properties> intermediateStreamProperties; - // adminClient is required for deleteCommittedMessages operation - private final AdminClient adminClient; - // used for intermediate streams - private final boolean deleteCommittedMessages; + protected final boolean deleteCommittedMessages; private final AtomicBoolean stopped = new AtomicBoolean(false); public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) { this.systemName = systemName; + this.config = config; if (metadataConsumer == null) { throw new SamzaException( @@ -113,35 +111,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { } this.metadataConsumer = metadataConsumer; - // populate brokerList from either consumer or producer configs - Properties props = new Properties(); - String brokerList = config.get( - String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - if (brokerList == null) { - brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName, - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - } - if (brokerList == null) { - throw new SamzaException( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName); - } - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); - - // kafka.admin.AdminUtils requires zkConnect - // this will change after we move to the new org.apache..AdminClient - String zkConnect = - config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT)); - if (StringUtils.isBlank(zkConnect)) { - throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName); - } - props.put(ZOOKEEPER_CONNECT, zkConnect); - - adminClient = AdminClient.create(props); - - getZkConnection = () -> { - return ZkUtils.apply(zkConnect, 6000, 6000, false); - }; - KafkaConfig kafkaConfig = new KafkaConfig(config); coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor()); coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig); @@ -197,6 +166,8 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { } catch (Exception e) { LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e); } + } + if (adminClient != null) { try { adminClient.close(); } catch (Exception e) { @@ -546,14 +517,14 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { public boolean createStream(StreamSpec streamSpec) { LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName()); - return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection); + return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection()); } @Override public boolean clearStream(StreamSpec streamSpec) { LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName()); - KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection); + KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection()); Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName())); return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty(); @@ -630,11 +601,56 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin { @Override public void deleteMessages(Map<SystemStreamPartition, String> offsets) { if (deleteCommittedMessages) { + if (adminClient == null) { + adminClient = AdminClient.create(createAdminClientProperties()); + } KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets); deleteMessageCalled = true; } } + protected Properties createAdminClientProperties() { + // populate brokerList from either consumer or producer configs + Properties props = new Properties(); + // included SSL settings if needed + + props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true)); + + //validate brokerList + String brokerList = config.get( + String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (brokerList == null) { + brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName, + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + } + if (brokerList == null) { + throw new SamzaException( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName); + } + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + + + // kafka.admin.AdminUtils requires zkConnect + // this will change after we move to the new org.apache..AdminClient + String zkConnect = + config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT)); + if (StringUtils.isBlank(zkConnect)) { + throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName); + } + props.put(ZOOKEEPER_CONNECT, zkConnect); + + return props; + } + + private Supplier<ZkUtils> getZkConnection() { + String zkConnect = + config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT)); + if (StringUtils.isBlank(zkConnect)) { + throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName); + } + return () -> ZkUtils.apply(zkConnect, 6000, 6000, false); + } + /** * Container for metadata about offsets. */ http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java index 65d0e42..b5f283a 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -77,8 +77,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy /** * Create a KafkaSystemConsumer for the provided {@code systemName} + * @param kafkaConsumer kafka Consumer object to be used by this system consumer * @param systemName system name for which we create the consumer * @param config application config + * @param clientId clientId from the kafka consumer to be used in the KafkaConsumerProxy * @param metrics metrics for this KafkaSystemConsumer * @param clock system clock */ @@ -106,12 +108,13 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy /** * Create internal kafka consumer object, which will be used in the Proxy. + * @param <K> key type for the consumer + * @param <V> value type for the consumer * @param systemName system name for which we create the consumer * @param kafkaConsumerConfig config object for Kafka's KafkaConsumer - * @return KafkaConsumer object + * @return KafkaConsumer newly created kafka consumer object */ - public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, - HashMap<String, Object> kafkaConsumerConfig) { + public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) { LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig); return new KafkaConsumer<>(kafkaConsumerConfig); @@ -176,7 +179,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy throw new SamzaException(msg, e); } - LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString); + LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString); // add the partition to the proxy proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); @@ -310,16 +313,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy return super.poll(systemStreamPartitions, timeout); } - /** - * convert from TopicPartition to TopicAndPartition - */ public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { return new TopicAndPartition(tp.topic(), tp.partition()); } - /** - * convert to TopicPartition from SystemStreamPartition - */ public static TopicPartition toTopicPartition(SystemStreamPartition ssp) { return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId()); } http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala deleted file mode 100644 index 5b4886a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka - -import kafka.consumer.SimpleConsumer -import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.consumer.ConsumerConfig - -class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, - clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes, - minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs) - extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) { - - def defaultFetch(fetches: (TopicAndPartition, Long)*) = { - val fbr = new FetchRequestBuilder().maxWait(maxWait) - .minBytes(minBytes) - .clientId(clientId) - - fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue))) - - this.fetch(fbr.build()) - } - - override def close(): Unit = super.close() - - override def send(request: TopicMetadataRequest): TopicMetadataResponse = super.send(request) - - override def fetch(request: FetchRequest): FetchResponse = super.fetch(request) - - override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = super.getOffsetsBefore(request) - - override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = super.commitOffsets(request) - - override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = super.fetchOffsets(request) - - override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId) -} - -/** - * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes). - * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize. - * If stream-level fetch size is not defined, use the default value. The default value is the - * Kafka's default fetch size value or the system-level fetch size value (if defined). - */ -case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]()) - http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala deleted file mode 100644 index 55b4611..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka - -import org.apache.kafka.common.errors.OffsetOutOfRangeException -import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.api.PartitionOffsetRequestInfo -import org.apache.samza.util.Logging -import org.apache.samza.util.KafkaUtil - -/** - * GetOffset validates offsets for topic partitions, and manages fetching new - * offsets for topics using Kafka's auto.offset.reset configuration. - */ -class GetOffset( - /** - * The default auto.offset.reset to use if a topic is not overridden in - * autoOffsetResetTopics. Any value other than "earliest" or "latest" will - * result in an exception when getRestOffset is called. - */ - default: String, - - /** - * Topic-level overrides for auto.offset.reset. Any value other than - * "earliest" or "latest" will result in an exception when getRestOffset is - * called. - */ - autoOffsetResetTopics: Map[String, String] = Map()) extends Logging with Toss { - - /** - * Checks if an offset is valid for a given topic/partition. Validity is - * defined as an offset that returns a readable non-empty message set with - * no exceptions. - */ - def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = { - info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition)) - - try { - val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) - - if (messages.hasError) { - KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception()) - } - - info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition)) - - true - } catch { - case e: OffsetOutOfRangeException => false - } - } - - /** - * Uses a topic's auto.offset.reset setting (defined via the - * autoOffsetResetTopics map in the constructor) to fetch either the - * earliest or latest offset. If neither earliest or latest is defined for - * the topic in question, the default supplied in the constructor will be - * used. - */ - def getResetOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition) = { - val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(getAutoOffset(topicAndPartition.topic), 1))) - val offsetResponse = consumer.getOffsetsBefore(offsetRequest) - val partitionOffsetResponse = offsetResponse - .partitionErrorAndOffsets - .get(topicAndPartition) - .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition)) - - KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception()) - - partitionOffsetResponse - .offsets - .headOption - .getOrElse(toss("Got response, but no offsets defined for %s" format topicAndPartition)) - } - - /** - * Returns either the earliest or latest setting (a Kafka constant) for a - * given topic using the autoOffsetResetTopics map defined in the - * constructor. If the topic is not defined in autoOffsetResetTopics, the - * default value supplied in the constructor will be used. This is used in - * conjunction with getResetOffset to fetch either the earliest or latest - * offset for a topic. - */ - private def getAutoOffset(topic: String): Long = { - info("Checking if auto.offset.reset is defined for topic %s" format (topic)) - autoOffsetResetTopics.getOrElse(topic, default) match { - case OffsetRequest.LargestTimeString => - info("Got reset of type %s." format OffsetRequest.LargestTimeString) - OffsetRequest.LatestTime - case OffsetRequest.SmallestTimeString => - info("Got reset of type %s." format OffsetRequest.SmallestTimeString) - OffsetRequest.EarliestTime - case other => toss("Can't get offset value for topic %s due to invalid value: %s" format (topic, other)) - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/a8a8dc78/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala deleted file mode 100644 index ab82609..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestGetOffset.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import java.nio.ByteBuffer - -import kafka.api._ -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.Message -import kafka.message.ByteBufferMessageSet -import org.apache.kafka.common.errors.OffsetOutOfRangeException -import org.junit._ -import org.junit.Assert._ -import org.mockito.Mockito -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer - -class TestGetOffset { - - private val outOfRangeOffset : String = "0" - - /** - * An empty message set is still a valid offset. It just means that the - * offset was for the upcoming message, which hasn't yet been written. The - * fetch request times out in such a case, and an empty message set is - * returned. - */ - @Test - def testIsValidOffsetWorksWithEmptyMessageSet { - val getOffset = new GetOffset(OffsetRequest.LargestTimeString) - // Should not throw an exception. - assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234")) - } - - /** - * An empty message set is still a valid offset. It just means that the - * offset was for the upcoming message, which hasn't yet been written. The - * fetch request times out in such a case, and an empty message set is - * returned. - */ - @Test - def testIsValidOffsetWorksWithOffsetOutOfRangeException { - val getOffset = new GetOffset(OffsetRequest.LargestTimeString) - // Should not throw an exception. - assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset)) - } - - /** - * Create a default fetch simple consumer that returns empty message sets. - */ - def getMockDefaultFetchSimpleConsumer = { - new DefaultFetchSimpleConsumer("", 0, 0, 0, "") { - val sc = Mockito.mock(classOf[SimpleConsumer]) - - // Build an empty fetch response. - val fetchResponse = { - val fetchResponse = Mockito.mock(classOf[FetchResponse]) - val messageSet = { - val messageSet = Mockito.mock(classOf[ByteBufferMessageSet]) - val messages = List() - - def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer])) - - when(messageSet.sizeInBytes).thenReturn(0) - when(messageSet.size).thenReturn(0) - when(messageSet.iterator).thenReturn(messages.iterator) - - messageSet - } - when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet) - - fetchResponse - } - - doAnswer(new Answer[FetchResponse] { - override def answer(invocation: InvocationOnMock): FetchResponse = { - if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists( - req => req._2.offset.toString.equals(outOfRangeOffset))) { - throw new OffsetOutOfRangeException("test exception") - } - fetchResponse - } - }).when(sc).fetch(any(classOf[FetchRequest])) - - override def fetch(request: FetchRequest): FetchResponse = { - sc.fetch(request) - } - } - } -}