Repository: kafka Updated Branches: refs/heads/trunk 063d534c5 -> 3e85f131e
KAFKA-4642: Improve test coverage of ProcessorStateManager Most of the exception paths weren't covered. Now they are. Author: Damian Guy <damian....@gmail.com> Reviewers: Eno Thereska, Guozhang Wang Closes #2442 from dguy/KAFKA-4642 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e85f131 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e85f131 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e85f131 Branch: refs/heads/trunk Commit: 3e85f131e6af5d2ef34499d0b2aca7afca85d79c Parents: 063d534 Author: Damian Guy <damian....@gmail.com> Authored: Wed Feb 1 20:17:06 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Feb 1 20:17:06 2017 -0800 ---------------------------------------------------------------------- .../internals/ProcessorStateManagerTest.java | 237 ++++++++++++++++++- 1 file changed, 231 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3e85f131/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 602601a..ba27c53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -24,10 +24,13 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.StateSerdes; @@ -42,24 +45,27 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public class ProcessorStateManagerTest { - private File baseDir; - private StateDirectory stateDirectory; - public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { private final Serializer<Integer> serializer = new IntegerSerializer(); @@ -196,6 +202,15 @@ public class ProcessorStateManagerTest { private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); + private final String storeName = "mockStateStore"; + private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); + private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0); + private final TaskId taskId = new TaskId(0, 1); + private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true); + private File baseDir; + private StateDirectory stateDirectory; + @Before public void setup() { @@ -498,14 +513,11 @@ public class ProcessorStateManagerTest { @Test public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { - final TaskId taskId = new TaskId(0, 1); final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); // write an empty checkpoint file final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); - final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) )); @@ -520,4 +532,217 @@ public class ProcessorStateManagerTest { assertFalse(checkpointFile.exists()); } + @Test + public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws Exception { + final File taskDirectory = stateDirectory.directoryForTask(taskId); + final FileChannel channel = FileChannel.open(new File(taskDirectory, + StateDirectory.LOCK_FILE_NAME).toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE); + // lock the task directory + final FileLock lock = channel.lock(); + + try { + new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap()); + fail("Should have thrown LockException"); + } catch (final LockException e) { + // pass + } finally { + lock.release(); + channel.close(); + } + } + + @Test + public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception { + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.<String, String>emptyMap()); + + try { + stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null); + fail("should have thrown illegal argument exception when store name same as checkpoint file"); + } catch (final IllegalArgumentException e) { + //pass + } + } + + @Test + public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception { + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.<String, String>emptyMap()); + stateManager.register(mockStateStore, false, null); + + try { + stateManager.register(mockStateStore, false, null); + fail("should have thrown illegal argument exception when store with same name already registered"); + } catch (final IllegalArgumentException e) { + // pass + } + + } + + @Test + public void shouldThrowStreamsExceptionWhenRestoreConsumerThrowsTimeoutException() throws Exception { + final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() { + @Override + public List<PartitionInfo> partitionsFor(final String topic) { + throw new TimeoutException("KABOOM!"); + } + }; + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + noPartitions, + mockRestoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + try { + stateManager.register(mockStateStore, false, null); + fail("should have thrown StreamsException due to timeout exception"); + } catch (final StreamsException e) { + // pass + } + } + + @Test + public void shouldThrowStreamsExceptionWhenRestoreConsumerReturnsNullPartitions() throws Exception { + final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() { + @Override + public List<PartitionInfo> partitionsFor(final String topic) { + return null; + } + }; + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + noPartitions, + mockRestoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + try { + stateManager.register(mockStateStore, false, null); + fail("should have thrown StreamsException due to timeout exception"); + } catch (final StreamsException e) { + // pass + } + } + + @Test + public void shouldThrowStreamsExceptionWhenPartitionForTopicNotFound() throws Exception { + final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() { + @Override + public List<PartitionInfo> partitionsFor(final String topic) { + return Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null)); + } + }; + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + Collections.singleton(new TopicPartition(changelogTopic, 1)), + mockRestoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + + try { + stateManager.register(mockStateStore, false, null); + fail("should have thrown StreamsException due to partition for topic not found"); + } catch (final StreamsException e) { + // pass + } + } + + @Test + public void shouldThrowIllegalStateExceptionWhenRestoringStateAndSubscriptionsNonEmpty() throws Exception { + final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() { + @Override + public List<PartitionInfo> partitionsFor(final String topic) { + return Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null)); + } + }; + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + Collections.singleton(changelogTopicPartition), + mockRestoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + + mockRestoreConsumer.subscribe(Collections.singleton("sometopic")); + + try { + stateManager.register(mockStateStore, false, null); + fail("should throw IllegalStateException when restore consumer has non-empty subscriptions"); + } catch (final IllegalStateException e) { + // pass + } + } + + @Test + public void shouldThrowIllegalStateExceptionWhenRestoreConsumerPositionGreaterThanEndOffset() throws Exception { + final AtomicInteger position = new AtomicInteger(10); + final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() { + @Override + public synchronized long position(final TopicPartition partition) { + // need to make the end position change to trigger the exception + return position.getAndIncrement(); + } + }; + + mockRestoreConsumer.updatePartitions(changelogTopic, Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null))); + + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + Collections.singleton(changelogTopicPartition), + mockRestoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + + stateManager.putOffsetLimit(changelogTopicPartition, 1); + // add a record with an offset less than the limit of 1 + mockRestoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 0, 0, 1, 1)); + + + try { + stateManager.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + fail("should have thrown IllegalStateException as end offset has changed"); + } catch (final IllegalStateException e) { + // pass + } + + } + + @Test + public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception { + restoreConsumer.updatePartitions(changelogTopic, Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null))); + + final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, + Collections.singleton(changelogTopicPartition), + restoreConsumer, + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic)); + + final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) { + @Override + public void close() { + throw new RuntimeException("KABOOM!"); + } + }; + stateManager.putOffsetLimit(changelogTopicPartition, 1); + restoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 0, 1, 1, 1)); + stateManager.register(stateStore, false, stateStore.stateRestoreCallback); + + try { + stateManager.close(Collections.<TopicPartition, Long>emptyMap()); + fail("Should throw ProcessorStateException if store close throws exception"); + } catch (final ProcessorStateException e) { + // pass + } + } + + }