Repository: kafka
Updated Branches:
  refs/heads/trunk 063d534c5 -> 3e85f131e


KAFKA-4642: Improve test coverage of ProcessorStateManager

Most of the exception paths weren't covered. Now they are.

Author: Damian Guy <damian....@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2442 from dguy/KAFKA-4642


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e85f131
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e85f131
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e85f131

Branch: refs/heads/trunk
Commit: 3e85f131e6af5d2ef34499d0b2aca7afca85d79c
Parents: 063d534
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Feb 1 20:17:06 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Feb 1 20:17:06 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManagerTest.java    | 237 ++++++++++++++++++-
 1 file changed, 231 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e85f131/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 602601a..ba27c53 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -24,10 +24,13 @@ import 
org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -42,24 +45,27 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class ProcessorStateManagerTest {
 
-    private File baseDir;
-    private StateDirectory stateDirectory;
-
     public static class MockRestoreConsumer extends MockConsumer<byte[], 
byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
@@ -196,6 +202,15 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = 
ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
     private final String nonPersistentStoreTopicName = 
ProcessorStateManager.storeChangelogTopic(applicationId, 
nonPersistentStoreName);
+    private final String storeName = "mockStateStore";
+    private final String changelogTopic = 
ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
+    private final TopicPartition changelogTopicPartition = new 
TopicPartition(changelogTopic, 0);
+    private final TaskId taskId = new TaskId(0, 1);
+    private final MockRestoreConsumer restoreConsumer = new 
MockRestoreConsumer();
+    private final MockStateStoreSupplier.MockStateStore mockStateStore = new 
MockStateStoreSupplier.MockStateStore(storeName, true);
+    private File baseDir;
+    private StateDirectory stateDirectory;
+
 
     @Before
     public void setup() {
@@ -498,14 +513,11 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws 
Exception {
-        final TaskId taskId = new TaskId(0, 1);
         final File checkpointFile = new 
File(stateDirectory.directoryForTask(taskId), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
         final OffsetCheckpoint oldCheckpoint = new 
OffsetCheckpoint(checkpointFile);
         oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
-        final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
         restoreConsumer.updatePartitions(persistentStoreTopicName, 
Utils.mkList(
                 new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), 
new Node[0], new Node[0])
         ));
@@ -520,4 +532,217 @@ public class ProcessorStateManagerTest {
         assertFalse(checkpointFile.exists());
     }
 
+    @Test
+    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws 
Exception {
+        final File taskDirectory = stateDirectory.directoryForTask(taskId);
+        final FileChannel channel = FileChannel.open(new File(taskDirectory,
+                                                              
StateDirectory.LOCK_FILE_NAME).toPath(),
+                                                     StandardOpenOption.CREATE,
+                                                     StandardOpenOption.WRITE);
+        // lock the task directory
+        final FileLock lock = channel.lock();
+
+        try {
+            new ProcessorStateManager(taskId, noPartitions, restoreConsumer, 
false, stateDirectory, Collections.<String, String>emptyMap());
+            fail("Should have thrown LockException");
+        } catch (final LockException e) {
+           // pass
+        } finally {
+            lock.release();
+            channel.close();
+        }
+    }
+
+    @Test
+    public void 
shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() 
throws Exception {
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
noPartitions,
+                                                                             
restoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.<String, String>emptyMap());
+
+        try {
+            stateManager.register(new 
MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME,
 true), true, null);
+            fail("should have thrown illegal argument exception when store 
name same as checkpoint file");
+        } catch (final IllegalArgumentException e) {
+            //pass
+        }
+    }
+
+    @Test
+    public void 
shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered()
 throws Exception {
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
noPartitions,
+                                                                             
restoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.<String, String>emptyMap());
+        stateManager.register(mockStateStore, false, null);
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown illegal argument exception when store 
with same name already registered");
+        } catch (final IllegalArgumentException e) {
+            // pass
+        }
+        
+    }
+
+    @Test
+    public void 
shouldThrowStreamsExceptionWhenRestoreConsumerThrowsTimeoutException() throws 
Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new 
MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
noPartitions,
+                                                                             
mockRestoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to timeout 
exception");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void 
shouldThrowStreamsExceptionWhenRestoreConsumerReturnsNullPartitions() throws 
Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new 
MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return null;
+            }
+        };
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
noPartitions,
+                                                                             
mockRestoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to timeout 
exception");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenPartitionForTopicNotFound() 
throws Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new 
MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return Collections.singletonList(new 
PartitionInfo(changelogTopic, 0, null, null, null));
+            }
+        };
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
Collections.singleton(new TopicPartition(changelogTopic, 1)),
+                                                                             
mockRestoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to partition for 
topic not found");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void 
shouldThrowIllegalStateExceptionWhenRestoringStateAndSubscriptionsNonEmpty() 
throws Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new 
MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return Collections.singletonList(new 
PartitionInfo(changelogTopic, 0, null, null, null));
+            }
+        };
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
Collections.singleton(changelogTopicPartition),
+                                                                             
mockRestoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+
+        mockRestoreConsumer.subscribe(Collections.singleton("sometopic"));
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should throw IllegalStateException when restore consumer has 
non-empty subscriptions");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void 
shouldThrowIllegalStateExceptionWhenRestoreConsumerPositionGreaterThanEndOffset()
 throws Exception {
+        final AtomicInteger position = new AtomicInteger(10);
+        final MockRestoreConsumer mockRestoreConsumer = new 
MockRestoreConsumer() {
+            @Override
+            public synchronized long position(final TopicPartition partition) {
+                // need to make the end position change to trigger the 
exception
+                return position.getAndIncrement();
+            }
+        };
+
+        mockRestoreConsumer.updatePartitions(changelogTopic, 
Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, 
null)));
+
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
Collections.singleton(changelogTopicPartition),
+                                                                             
mockRestoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+
+        stateManager.putOffsetLimit(changelogTopicPartition, 1);
+        // add a record with an offset less than the limit of 1
+        mockRestoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 
0, 0, 1, 1));
+
+
+        try {
+            stateManager.register(mockStateStore, false, 
mockStateStore.stateRestoreCallback);
+            fail("should have thrown IllegalStateException as end offset has 
changed");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws 
Exception {
+        restoreConsumer.updatePartitions(changelogTopic, 
Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, 
null)));
+
+        final ProcessorStateManager stateManager = new 
ProcessorStateManager(taskId,
+                                                                             
Collections.singleton(changelogTopicPartition),
+                                                                             
restoreConsumer,
+                                                                             
false,
+                                                                             
stateDirectory,
+                                                                             
Collections.singletonMap(storeName, changelogTopic));
+
+        final MockStateStoreSupplier.MockStateStore stateStore = new 
MockStateStoreSupplier.MockStateStore(storeName, true) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        stateManager.putOffsetLimit(changelogTopicPartition, 1);
+        restoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 0, 
1, 1, 1));
+        stateManager.register(stateStore, false, 
stateStore.stateRestoreCallback);
+
+        try {
+            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            fail("Should throw ProcessorStateException if store close throws 
exception");
+        } catch (final ProcessorStateException e) {
+            // pass
+        }
+    }
+
+
 }

Reply via email to