http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java new file mode 100644 index 0000000..0a1f95c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class MinTimestampTrackerTest { + + private Stamped<String> elem(long timestamp) { + return new Stamped<>("", timestamp); + } + + @SuppressWarnings("unchecked") + @Test + public void testTracking() { + TimestampTracker<String> tracker = new MinTimestampTracker<>(); + + Object[] elems = new Object[]{ + elem(100), elem(101), elem(102), elem(98), elem(99), elem(100) + }; + + int insertionIndex = 0; + int removalIndex = 0; + + // add 100 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(100L, tracker.get()); + + // add 101 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(100L, tracker.get()); + + // remove 100 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(101L, tracker.get()); + + // add 102 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(101L, tracker.get()); + + // add 98 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(98L, tracker.get()); + + // add 99 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(98L, tracker.get()); + + // add 100 + tracker.addElement((Stamped<String>) elems[insertionIndex++]); + assertEquals(98L, tracker.get()); + + // remove 101 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(98L, tracker.get()); + + // remove 102 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(98L, tracker.get()); + + // remove 98 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(99L, tracker.get()); + + // remove 99 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(100L, tracker.get()); + + // remove 100 + tracker.removeElement((Stamped<String>) elems[removalIndex++]); + assertEquals(100L, tracker.get()); + + assertEquals(insertionIndex, removalIndex); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java new file mode 100644 index 0000000..b91acdc --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import static org.junit.Assert.assertEquals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.test.MockSourceNode; +import org.apache.kafka.test.MockTimestampExtractor; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +public class PartitionGroupTest { + private final Serializer<Integer> intSerializer = new IntegerSerializer(); + private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); + private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); + private final TopicPartition partition1 = new TopicPartition("topic", 1); + private final TopicPartition partition2 = new TopicPartition("topic", 2); + private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(intDeserializer, intDeserializer)); + private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(intDeserializer, intDeserializer)); + + private final byte[] recordValue = intSerializer.serialize(null, 10); + private final byte[] recordKey = intSerializer.serialize(null, 1); + + private final PartitionGroup group = new PartitionGroup(new HashMap<TopicPartition, RecordQueue>() { + { + put(partition1, queue1); + put(partition2, queue2); + } + }, timestampExtractor); + + @Test + public void testTimeTracking() { + assertEquals(0, group.numBuffered()); + + // add three 3 records with timestamp 1, 3, 5 to partition-1 + List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue)); + + group.addRawRecords(partition1, list1); + + // add three 3 records with timestamp 2, 4, 6 to partition-2 + List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue)); + + group.addRawRecords(partition2, list2); + + assertEquals(6, group.numBuffered()); + assertEquals(3, group.numBuffered(partition1)); + assertEquals(3, group.numBuffered(partition2)); + assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp()); + + StampedRecord record; + PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + + // get one record + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(1L, record.timestamp); + assertEquals(5, group.numBuffered()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(3, group.numBuffered(partition2)); + assertEquals(TimestampTracker.NOT_KNOWN, group.timestamp()); + + // get one record, now the time should be advanced + record = group.nextRecord(info); + assertEquals(partition2, info.partition()); + assertEquals(2L, record.timestamp); + assertEquals(4, group.numBuffered()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java new file mode 100644 index 0000000..343ed52 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.processor.RestoreFunc; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.OffsetCheckpoint; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +public class ProcessorStateManagerTest { + + private static class MockStateStore implements StateStore { + private final String name; + private final boolean persistent; + + public boolean flushed = false; + public boolean closed = false; + public final ArrayList<Integer> keys = new ArrayList<>(); + + public MockStateStore(String name, boolean persistent) { + this.name = name; + this.persistent = persistent; + } + @Override + public String name() { + return name; + } + @Override + public void flush() { + flushed = true; + } + @Override + public void close() { + closed = true; + } + @Override + public boolean persistent() { + return persistent; + } + + public final RestoreFunc restoreFunc = new RestoreFunc() { + private final Deserializer<Integer> deserializer = new IntegerDeserializer(); + + @Override + public void apply(byte[] key, byte[] value) { + keys.add(deserializer.deserialize("", key)); + } + }; + } + + private class MockRestoreConsumer extends MockConsumer<byte[], byte[]> { + private final Serializer<Integer> serializer = new IntegerSerializer(); + + public TopicPartition assignedPartition = null; + public TopicPartition seekPartition = null; + public long seekOffset = -1L; + public boolean seekToBeginingCalled = false; + public boolean seekToEndCalled = false; + private long endOffset = 0L; + private long currentOffset = 0L; + + private ArrayList<ConsumerRecord<byte[], byte[]>> recordBuffer = new ArrayList<>(); + + MockRestoreConsumer() { + super(OffsetResetStrategy.EARLIEST); + + reset(); + } + + // reset this mock restore consumer for a state store registration + public void reset() { + assignedPartition = null; + seekOffset = -1L; + seekToBeginingCalled = false; + seekToEndCalled = false; + endOffset = 0L; + recordBuffer.clear(); + } + + // buffer a record (we cannot use addRecord because we need to add records before asigning a partition) + public void bufferRecord(ConsumerRecord<Integer, Integer> record) { + recordBuffer.add( + new ConsumerRecord<>(record.topic(), record.partition(), record.offset(), + serializer.serialize(record.topic(), record.key()), + serializer.serialize(record.topic(), record.value()))); + endOffset = record.offset(); + + super.updateEndOffsets(Collections.singletonMap(assignedPartition, endOffset)); + } + + @Override + public synchronized void assign(List<TopicPartition> partitions) { + int numPartitions = partitions.size(); + if (numPartitions > 1) + throw new IllegalArgumentException("RestoreConsumer: more than one partition specified"); + + if (numPartitions == 1) { + if (assignedPartition != null) + throw new IllegalStateException("RestoreConsumer: partition already assigned"); + assignedPartition = partitions.get(0); + + // set the beginning offset to 0 + // NOTE: this is users responsible to set the initial lEO. + super.updateBeginningOffsets(Collections.singletonMap(assignedPartition, 0L)); + } + + super.assign(partitions); + } + + @Override + public ConsumerRecords<byte[], byte[]> poll(long timeout) { + // add buffered records to MockConsumer + for (ConsumerRecord<byte[], byte[]> record : recordBuffer) { + super.addRecord(record); + } + recordBuffer.clear(); + + ConsumerRecords<byte[], byte[]> records = super.poll(timeout); + + // set the current offset + Iterable<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(assignedPartition); + for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { + currentOffset = record.offset(); + } + + return records; + } + + @Override + public synchronized long position(TopicPartition partition) { + if (!partition.equals(assignedPartition)) + throw new IllegalStateException("RestoreConsumer: unassigned partition"); + + return currentOffset; + } + + @Override + public synchronized void seek(TopicPartition partition, long offset) { + if (offset < 0) + throw new IllegalArgumentException("RestoreConsumer: offset should not be negative"); + + if (seekOffset >= 0) + throw new IllegalStateException("RestoreConsumer: offset already seeked"); + + seekPartition = partition; + seekOffset = offset; + currentOffset = offset; + super.seek(partition, offset); + } + + @Override + public synchronized void seekToBeginning(TopicPartition... partitions) { + if (partitions.length != 1) + throw new IllegalStateException("RestoreConsumer: other than one partition specified"); + + for (TopicPartition partition : partitions) { + if (!partition.equals(assignedPartition)) + throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition"); + } + + seekToBeginingCalled = true; + currentOffset = 0L; + } + + @Override + public synchronized void seekToEnd(TopicPartition... partitions) { + if (partitions.length != 1) + throw new IllegalStateException("RestoreConsumer: other than one partition specified"); + + for (TopicPartition partition : partitions) { + if (!partition.equals(assignedPartition)) + throw new IllegalStateException("RestoreConsumer: seek-to-end not on the assigned partition"); + } + + seekToEndCalled = true; + currentOffset = endOffset; + } + } + + @Test + public void testLockStateDirectory() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + FileLock lock; + + // the state manager locks the directory + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer()); + + try { + // this should not get the lock + lock = ProcessorStateManager.lockStateDirectory(baseDir); + assertNull(lock); + } finally { + // by closing the state manager, release the lock + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + + // now, this should get the lock + lock = ProcessorStateManager.lockStateDirectory(baseDir); + try { + assertNotNull(lock); + } finally { + if (lock != null) lock.release(); + } + } finally { + Utils.delete(baseDir); + } + } + + @Test(expected = IllegalStateException.class) + public void testNoTopic() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + MockStateStore mockStateStore = new MockStateStore("mockStore", false); + + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer()); + try { + stateMgr.register(mockStateStore, mockStateStore.restoreFunc); + } finally { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testRegisterPersistentStore() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + long lastCheckpointedOffset = 10L; + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); + + MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + restoreConsumer.updatePartitions("persistentStore", Arrays.asList( + new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L)); + + MockStateStore persistentStore = new MockStateStore("persistentStore", false); // non persistent store + + ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer); + try { + restoreConsumer.reset(); + + ArrayList<Integer> expectedKeys = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + long offset = (long) i; + int key = i * 10; + expectedKeys.add(key); + restoreConsumer.bufferRecord( + new ConsumerRecord<>("persistentStore", 2, offset, key, 0) + ); + } + + stateMgr.register(persistentStore, persistentStore.restoreFunc); + + assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition); + assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset); + assertFalse(restoreConsumer.seekToBeginingCalled); + assertTrue(restoreConsumer.seekToEndCalled); + assertEquals(expectedKeys, persistentStore.keys); + + } finally { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testRegisterNonPersistentStore() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + long lastCheckpointedOffset = 10L; + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); + checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset)); + + MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList( + new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L)); + + MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", true); // persistent store + + ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer); + try { + restoreConsumer.reset(); + + ArrayList<Integer> expectedKeys = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + long offset = (long) (i + 100); + int key = i; + expectedKeys.add(i); + restoreConsumer.bufferRecord( + new ConsumerRecord<>("nonPersistentStore", 2, offset, key, 0) + ); + } + + stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc); + + assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition); + assertEquals(0L, restoreConsumer.seekOffset); + assertTrue(restoreConsumer.seekToBeginingCalled); + assertTrue(restoreConsumer.seekToEndCalled); + assertEquals(expectedKeys, nonPersistentStore.keys); + } finally { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testGetStore() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + restoreConsumer.updatePartitions("mockStore", Arrays.asList( + new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0]) + )); + + MockStateStore mockStateStore = new MockStateStore("mockStore", false); + + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer); + try { + stateMgr.register(mockStateStore, mockStateStore.restoreFunc); + + assertNull(stateMgr.getStore("noSuchStore")); + assertEquals(mockStateStore, stateMgr.getStore("mockStore")); + + } finally { + stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); + } + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testClose() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + File checkpointFile = new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME); + try { + // write an empty checkpoint file + OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); + oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap()); + + MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + restoreConsumer.updatePartitions("persistentStore", Arrays.asList( + new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList( + new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]) + )); + + // set up ack'ed offsets + HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>(); + ackedOffsets.put(new TopicPartition("persistentStore", 1), 123L); + ackedOffsets.put(new TopicPartition("nonPersistentStore", 1), 456L); + ackedOffsets.put(new TopicPartition("otherTopic", 1), 789L); + + MockStateStore persistentStore = new MockStateStore("persistentStore", true); + MockStateStore nonPersistentStore = new MockStateStore("nonPersistentStore", false); + + ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer); + try { + // make sure the checkpoint file is deleted + assertFalse(checkpointFile.exists()); + + restoreConsumer.reset(); + stateMgr.register(persistentStore, persistentStore.restoreFunc); + + restoreConsumer.reset(); + stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc); + } finally { + // close the state manager with the ack'ed offsets + stateMgr.close(ackedOffsets); + } + + // make sure all stores are closed, and the checkpoint file is written. + assertTrue(persistentStore.flushed); + assertTrue(persistentStore.closed); + assertTrue(nonPersistentStore.flushed); + assertTrue(nonPersistentStore.closed); + assertTrue(checkpointFile.exists()); + + // the checkpoint file should contain an offset from the persistent store only. + OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); + Map<TopicPartition, Long> checkpointedOffsets = newCheckpoint.read(); + assertEquals(1, checkpointedOffsets.size()); + assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition("persistentStore", 1))); + } finally { + Utils.delete(baseDir); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java new file mode 100644 index 0000000..1abb989 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.ProcessorTopologyTestDriver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Properties; + +public class ProcessorTopologyTest { + + private static final Serializer<String> STRING_SERIALIZER = new StringSerializer(); + private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer(); + private static final File STATE_DIR = new File("build/data").getAbsoluteFile(); + + protected static final String INPUT_TOPIC = "input-topic"; + protected static final String OUTPUT_TOPIC_1 = "output-topic-1"; + protected static final String OUTPUT_TOPIC_2 = "output-topic-2"; + + private static long timestamp = 1000L; + + private ProcessorTopologyTestDriver driver; + private StreamingConfig config; + + @Before + public void setup() { + STATE_DIR.mkdirs(); + Properties props = new Properties(); + props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); + props.setProperty(StreamingConfig.STATE_DIR_CONFIG, STATE_DIR.getAbsolutePath()); + props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); + props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + this.config = new StreamingConfig(props); + } + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + driver = null; + if (STATE_DIR.exists()) { + try { + Files.walkFileTree(STATE_DIR.toPath(), new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + + }); + } catch (IOException e) { + // do nothing + } + } + } + + @Test + public void testTopologyMetadata() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source-1", "topic-1"); + builder.addSource("source-2", "topic-2", "topic-3"); + builder.addProcessor("processor-1", new MockProcessorDef(), "source-1"); + builder.addProcessor("processor-2", new MockProcessorDef(), "source-1", "source-2"); + builder.addSink("sink-1", "topic-3", "processor-1"); + builder.addSink("sink-2", "topic-4", "processor-1", "processor-2"); + + final ProcessorTopology topology = builder.build(); + + assertEquals(6, topology.processors().size()); + + assertEquals(2, topology.sources().size()); + + assertEquals(3, topology.sourceTopics().size()); + + assertNotNull(topology.source("topic-1")); + + assertNotNull(topology.source("topic-2")); + + assertNotNull(topology.source("topic-3")); + + assertEquals(topology.source("topic-2"), topology.source("topic-3")); + } + + @Test + public void testDrivingSimpleTopology() { + driver = new ProcessorTopologyTestDriver(config, createSimpleTopology()); + driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1"); + assertNoOutputRecord(OUTPUT_TOPIC_2); + + driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2"); + assertNoOutputRecord(OUTPUT_TOPIC_2); + + driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + assertNoOutputRecord(OUTPUT_TOPIC_2); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5"); + } + + @Test + public void testDrivingMultiplexingTopology() { + driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology()); + driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); + + driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)"); + + driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)"); + } + + @Test + public void testDrivingStatefulTopology() { + String storeName = "entries"; + driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName); + driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + assertNoOutputRecord(OUTPUT_TOPIC_1); + + KeyValueStore<String, String> store = driver.getKeyValueStore("entries"); + assertEquals("value4", store.get("key1")); + assertEquals("value2", store.get("key2")); + assertEquals("value3", store.get("key3")); + assertNull(store.get("key4")); + } + + protected void assertNextOutputRecord(String topic, String key, String value) { + assertProducerRecord(driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER), topic, key, value); + } + + protected void assertNoOutputRecord(String topic) { + assertNull(driver.readOutput(topic)); + } + + private void assertProducerRecord(ProducerRecord<String, String> record, String topic, String key, String value) { + assertEquals(topic, record.topic()); + assertEquals(key, record.key()); + assertEquals(value, record.value()); + // Kafka Streaming doesn't set the partition, so it's always null + assertNull(record.partition()); + } + + protected TopologyBuilder createSimpleTopology() { + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + .addProcessor("processor", define(new ForwardingProcessor()), "source") + .addSink("sink", OUTPUT_TOPIC_1, "processor"); + } + + protected TopologyBuilder createMultiplexingTopology() { + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") + .addSink("sink1", OUTPUT_TOPIC_1, "processor") + .addSink("sink2", OUTPUT_TOPIC_2, "processor"); + } + + protected TopologyBuilder createStatefulTopology(String storeName) { + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") + .addSink("counts", OUTPUT_TOPIC_1, "processor"); + } + + /** + * A processor that simply forwards all messages to all children. + */ + protected static class ForwardingProcessor extends AbstractProcessor<String, String> { + + @Override + public void process(String key, String value) { + context().forward(key, value); + } + + @Override + public void punctuate(long streamTime) { + context().forward(Long.toString(streamTime), "punctuate"); + } + } + + /** + * A processor that forwards slightly-modified messages to each child. + */ + protected static class MultiplexingProcessor extends AbstractProcessor<String, String> { + + private final int numChildren; + + public MultiplexingProcessor(int numChildren) { + this.numChildren = numChildren; + } + + @Override + public void process(String key, String value) { + for (int i = 0; i != numChildren; ++i) { + context().forward(key, value + "(" + (i + 1) + ")", i); + } + } + + @Override + public void punctuate(long streamTime) { + for (int i = 0; i != numChildren; ++i) { + context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i); + } + } + } + + /** + * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When + * {@link #punctuate(long)} is called, it outputs the total number of entries in the store. + */ + protected static class StatefulProcessor extends AbstractProcessor<String, String> { + + private KeyValueStore<String, String> store; + private final String storeName; + + public StatefulProcessor(String storeName) { + this.storeName = storeName; + } + + @Override + public void init(ProcessorContext context) { + super.init(context); + store = new InMemoryKeyValueStore<>(storeName, context); + } + + @Override + public void process(String key, String value) { + store.put(key, value); + } + + @Override + public void punctuate(long streamTime) { + int count = 0; + for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) { + iter.next(); + ++count; + } + context().forward(Long.toString(streamTime), count); + } + } + + protected ProcessorDef define(final Processor processor) { + return new ProcessorDef() { + @Override + public Processor instance() { + return processor; + } + }; + } + + public static class CustomTimestampExtractor implements TimestampExtractor { + @Override + public long extract(ConsumerRecord<Object, Object> record) { + return timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java new file mode 100644 index 0000000..b1403bd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; + +public class PunctuationQueueTest { + + @Test + public void testPunctuationInterval() { + TestProcessor processor = new TestProcessor(); + ProcessorNode<String, String> node = new ProcessorNode<>("test", processor); + PunctuationQueue queue = new PunctuationQueue(); + + PunctuationSchedule sched = new PunctuationSchedule(node, 100L); + final long now = sched.timestamp - 100L; + + queue.schedule(sched); + + Punctuator punctuator = new Punctuator() { + public void punctuate(ProcessorNode node, long time) { + node.processor().punctuate(time); + } + }; + + queue.mayPunctuate(now, punctuator); + assertEquals(0, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 99L, punctuator); + assertEquals(0, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 100L, punctuator); + assertEquals(1, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 199L, punctuator); + assertEquals(1, processor.punctuatedAt.size()); + + queue.mayPunctuate(now + 200L, punctuator); + assertEquals(2, processor.punctuatedAt.size()); + } + + private static class TestProcessor implements Processor<String, String> { + + public final ArrayList<Long> punctuatedAt = new ArrayList<>(); + + @Override + public void init(ProcessorContext context) { + } + + @Override + public void process(String key, String value) { + } + + @Override + public void punctuate(long streamTime) { + punctuatedAt.add(streamTime); + } + + @Override + public void close() { + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java new file mode 100644 index 0000000..6e86410 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.test.MockSourceNode; +import org.apache.kafka.test.MockTimestampExtractor; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class RecordQueueTest { + private final Serializer<Integer> intSerializer = new IntegerSerializer(); + private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); + private final TimestampExtractor timestampExtractor = new MockTimestampExtractor(); + private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), new MockSourceNode<>(intDeserializer, intDeserializer)); + + private final byte[] recordValue = intSerializer.serialize(null, 10); + private final byte[] recordKey = intSerializer.serialize(null, 1); + + @Test + public void testTimeTracking() { + + assertTrue(queue.isEmpty()); + + // add three 3 out-of-order records with timestamp 2, 1, 3 + List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3, recordKey, recordValue)); + + queue.addRawRecords(list1, timestampExtractor); + + assertEquals(3, queue.size()); + assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp()); + + // poll the first record, now with 1, 3 + assertEquals(2L, queue.poll().timestamp); + assertEquals(2, queue.size()); + assertEquals(1L, queue.timestamp()); + + // poll the second record, now with 3 + assertEquals(1L, queue.poll().timestamp); + assertEquals(1, queue.size()); + assertEquals(3L, queue.timestamp()); + + // add three 3 out-of-order records with timestamp 4, 1, 2 + // now with 3, 4, 1, 2 + List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 1, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 2, recordKey, recordValue)); + + queue.addRawRecords(list2, timestampExtractor); + + assertEquals(4, queue.size()); + assertEquals(3L, queue.timestamp()); + + // poll the third record, now with 4, 1, 2 + assertEquals(3L, queue.poll().timestamp); + assertEquals(3, queue.size()); + assertEquals(3L, queue.timestamp()); + + // poll the rest records + assertEquals(4L, queue.poll().timestamp); + assertEquals(3L, queue.timestamp()); + + assertEquals(1L, queue.poll().timestamp); + assertEquals(3L, queue.timestamp()); + + assertEquals(2L, queue.poll().timestamp); + assertEquals(0, queue.size()); + assertEquals(3L, queue.timestamp()); + + // add three more records with 4, 5, 6 + List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 4, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 6, recordKey, recordValue)); + + queue.addRawRecords(list3, timestampExtractor); + + assertEquals(3, queue.size()); + assertEquals(3L, queue.timestamp()); + + // poll one record again, the timestamp should advance now + assertEquals(4L, queue.poll().timestamp); + assertEquals(2, queue.size()); + assertEquals(5L, queue.timestamp()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java new file mode 100644 index 0000000..8dcfc40 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.test.MockSourceNode; +import org.junit.Test; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StreamTaskTest { + + private final Serializer<Integer> intSerializer = new IntegerSerializer(); + private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); + private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); + + private final TopicPartition partition1 = new TopicPartition("topic1", 1); + private final TopicPartition partition2 = new TopicPartition("topic2", 1); + private final HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(partition1, partition2)); + + private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final ProcessorTopology topology = new ProcessorTopology( + Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2), + new HashMap<String, SourceNode>() { + { + put("topic1", source1); + put("topic2", source2); + } + }); + + private final StreamingConfig config = new StreamingConfig(new Properties() { + { + setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + } + }); + + private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer); + private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + + private final byte[] recordValue = intSerializer.serialize(null, 10); + private final byte[] recordKey = intSerializer.serialize(null, 1); + + + @Before + public void setup() { + consumer.assign(Arrays.asList(partition1, partition2)); + } + + @SuppressWarnings("unchecked") + @Test + public void testProcessOrder() { + StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue) + )); + + assertEquals(task.process(), 5); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 0); + + assertEquals(task.process(), 4); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 3); + assertEquals(source1.numReceived, 2); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 2); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 1); + + assertEquals(task.process(), 1); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 2); + + assertEquals(task.process(), 0); + assertEquals(source1.numReceived, 3); + assertEquals(source2.numReceived, 3); + + task.close(); + } + + @SuppressWarnings("unchecked") + @Test + public void testPauseResume() { + StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey, recordValue) + )); + + assertEquals(task.process(), 5); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 0); + + assertEquals(consumer.paused().size(), 1); + assertTrue(consumer.paused().contains(partition2)); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey, recordValue) + )); + + assertEquals(consumer.paused().size(), 2); + assertTrue(consumer.paused().contains(partition1)); + assertTrue(consumer.paused().contains(partition2)); + + assertEquals(task.process(), 7); + assertEquals(source1.numReceived, 1); + assertEquals(source2.numReceived, 1); + + assertEquals(consumer.paused().size(), 1); + assertTrue(consumer.paused().contains(partition1)); + + assertEquals(task.process(), 6); + assertEquals(source1.numReceived, 2); + assertEquals(source2.numReceived, 1); + + assertEquals(consumer.paused().size(), 0); + + task.close(); + } + + private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { + return Arrays.asList(recs); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java new file mode 100644 index 0000000..1f3e541 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +public class StreamThreadTest { + + private TopicPartition t1p1 = new TopicPartition("topic1", 1); + private TopicPartition t1p2 = new TopicPartition("topic1", 2); + private TopicPartition t2p1 = new TopicPartition("topic2", 1); + private TopicPartition t2p2 = new TopicPartition("topic2", 2); + + private Properties configProps() { + return new Properties() { + { + setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + } + }; + } + + private static class TestStreamTask extends StreamTask { + public boolean committed = false; + + public TestStreamTask(int id, + Consumer<byte[], byte[]> consumer, + Producer<byte[], byte[]> producer, + Consumer<byte[], byte[]> restoreConsumer, + Collection<TopicPartition> partitions, + ProcessorTopology topology, + StreamingConfig config) { + super(id, consumer, producer, restoreConsumer, partitions, topology, config); + } + + @Override + public void commit() { + super.commit(); + committed = true; + } + } + + private ByteArraySerializer serializer = new ByteArraySerializer(); + + @SuppressWarnings("unchecked") + @Test + public void testPartitionAssignmentChange() throws Exception { + StreamingConfig config = new StreamingConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, new SystemTime()) { + @Override + protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) { + return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); + } + }; + + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + + assertTrue(thread.tasks().isEmpty()); + + List<TopicPartition> revokedPartitions; + List<TopicPartition> assignedPartitions; + Set<TopicPartition> expectedGroup1; + Set<TopicPartition> expectedGroup2; + + revokedPartitions = Collections.emptyList(); + assignedPartitions = Collections.singletonList(t1p1); + expectedGroup1 = new HashSet<>(Arrays.asList(t1p1)); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().containsKey(1)); + assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); + assertEquals(1, thread.tasks().size()); + + revokedPartitions = assignedPartitions; + assignedPartitions = Collections.singletonList(t1p2); + expectedGroup2 = new HashSet<>(Arrays.asList(t1p2)); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().containsKey(2)); + assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertEquals(1, thread.tasks().size()); + + revokedPartitions = assignedPartitions; + assignedPartitions = Arrays.asList(t1p1, t1p2); + expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); + expectedGroup2 = new HashSet<>(Collections.singleton(t1p2)); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().containsKey(1)); + assertTrue(thread.tasks().containsKey(2)); + assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertEquals(2, thread.tasks().size()); + + revokedPartitions = assignedPartitions; + assignedPartitions = Arrays.asList(t1p1, t1p2, t2p1, t2p2); + expectedGroup1 = new HashSet<>(Arrays.asList(t1p1, t2p1)); + expectedGroup2 = new HashSet<>(Arrays.asList(t1p2, t2p2)); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().containsKey(1)); + assertTrue(thread.tasks().containsKey(2)); + assertEquals(expectedGroup1, thread.tasks().get(1).partitions()); + assertEquals(expectedGroup2, thread.tasks().get(2).partitions()); + assertEquals(2, thread.tasks().size()); + + revokedPartitions = assignedPartitions; + assignedPartitions = Collections.emptyList(); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertTrue(thread.tasks().isEmpty()); + } + + @Test + public void testMaybeClean() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final long cleanupDelay = 1000L; + Properties props = configProps(); + props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay)); + props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + + StreamingConfig config = new StreamingConfig(props); + + File stateDir1 = new File(baseDir, "1"); + File stateDir2 = new File(baseDir, "2"); + File stateDir3 = new File(baseDir, "3"); + File extraDir = new File(baseDir, "X"); + stateDir1.mkdir(); + stateDir2.mkdir(); + stateDir3.mkdir(); + extraDir.mkdir(); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + MockTime mockTime = new MockTime(); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) { + @Override + public void maybeClean() { + super.maybeClean(); + } + @Override + protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) { + return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); + } + }; + + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + + assertTrue(thread.tasks().isEmpty()); + mockTime.sleep(cleanupDelay); + + // all directories exist since an assignment didn't happen + assertTrue(stateDir1.exists()); + assertTrue(stateDir2.exists()); + assertTrue(stateDir3.exists()); + assertTrue(extraDir.exists()); + + List<TopicPartition> revokedPartitions; + List<TopicPartition> assignedPartitions; + Map<Integer, StreamTask> prevTasks; + + // + // Assign t1p1 and t1p2. This should create Task 1 & 2 + // + revokedPartitions = Collections.emptyList(); + assignedPartitions = Arrays.asList(t1p1, t1p2); + prevTasks = new HashMap(thread.tasks()); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + // there shouldn't be any previous task + assertTrue(prevTasks.isEmpty()); + + // task 1 & 2 are created + assertEquals(2, thread.tasks().size()); + + // all directories should still exit before the cleanup delay time + mockTime.sleep(cleanupDelay - 10L); + thread.maybeClean(); + assertTrue(stateDir1.exists()); + assertTrue(stateDir2.exists()); + assertTrue(stateDir3.exists()); + assertTrue(extraDir.exists()); + + // all state directories except for task 1 & 2 will be removed. the extra directory should still exists + mockTime.sleep(11L); + thread.maybeClean(); + assertTrue(stateDir1.exists()); + assertTrue(stateDir2.exists()); + assertFalse(stateDir3.exists()); + assertTrue(extraDir.exists()); + + // + // Revoke t1p1 and t1p2. This should remove Task 1 & 2 + // + revokedPartitions = assignedPartitions; + assignedPartitions = Collections.emptyList(); + prevTasks = new HashMap(thread.tasks()); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + // previous tasks should be committed + assertEquals(2, prevTasks.size()); + for (StreamTask task : prevTasks.values()) { + assertTrue(((TestStreamTask) task).committed); + ((TestStreamTask) task).committed = false; + } + + // no task + assertTrue(thread.tasks().isEmpty()); + + // all state directories for task 1 & 2 still exist before the cleanup delay time + mockTime.sleep(cleanupDelay - 10L); + thread.maybeClean(); + assertTrue(stateDir1.exists()); + assertTrue(stateDir2.exists()); + assertFalse(stateDir3.exists()); + assertTrue(extraDir.exists()); + + // all state directories for task 1 & 2 are removed + mockTime.sleep(11L); + thread.maybeClean(); + assertFalse(stateDir1.exists()); + assertFalse(stateDir2.exists()); + assertFalse(stateDir3.exists()); + assertTrue(extraDir.exists()); + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testMaybeCommit() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + final long commitInterval = 1000L; + Properties props = configProps(); + props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); + + StreamingConfig config = new StreamingConfig(props); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockTime mockTime = new MockTime(); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) { + @Override + public void maybeCommit() { + super.maybeCommit(); + } + @Override + protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) { + return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config); + } + }; + + ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; + + List<TopicPartition> revokedPartitions; + List<TopicPartition> assignedPartitions; + + // + // Assign t1p1 and t1p2. This should create Task 1 & 2 + // + revokedPartitions = Collections.emptyList(); + assignedPartitions = Arrays.asList(t1p1, t1p2); + + rebalanceListener.onPartitionsRevoked(revokedPartitions); + rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertEquals(2, thread.tasks().size()); + + // no task is committed before the commit interval + mockTime.sleep(commitInterval - 10L); + thread.maybeCommit(); + for (StreamTask task : thread.tasks().values()) { + assertFalse(((TestStreamTask) task).committed); + } + + // all tasks are committed after the commit interval + mockTime.sleep(11L); + thread.maybeCommit(); + for (StreamTask task : thread.tasks().values()) { + assertTrue(((TestStreamTask) task).committed); + ((TestStreamTask) task).committed = false; + } + + // no task is committed before the commit interval, again + mockTime.sleep(commitInterval - 10L); + thread.maybeCommit(); + for (StreamTask task : thread.tasks().values()) { + assertFalse(((TestStreamTask) task).committed); + } + + // all tasks are committed after the commit interval, again + mockTime.sleep(11L); + thread.maybeCommit(); + for (StreamTask task : thread.tasks().values()) { + assertTrue(((TestStreamTask) task).committed); + ((TestStreamTask) task).committed = false; + } + + } finally { + Utils.delete(baseDir); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java new file mode 100644 index 0000000..2c42e6c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorTopology; + +import java.util.List; + +public class KStreamTestDriver { + + private final ProcessorTopology topology; + private final MockProcessorContext context; + private ProcessorNode currNode; + + public KStreamTestDriver(KStreamBuilder builder) { + this(builder, null, null); + } + + public KStreamTestDriver(KStreamBuilder builder, Serializer<?> serializer, Deserializer<?> deserializer) { + this.topology = builder.build(); + this.context = new MockProcessorContext(this, serializer, deserializer); + + for (ProcessorNode node : topology.processors()) { + currNode = node; + try { + node.init(context); + } finally { + currNode = null; + } + } + } + + public void process(String topicName, Object key, Object value) { + currNode = topology.source(topicName); + try { + forward(key, value); + } finally { + currNode = null; + } + } + + public void setTime(long timestamp) { + context.setTime(timestamp); + } + + public StateStore getStateStore(String name) { + return context.getStateStore(name); + } + + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value) { + ProcessorNode thisNode = currNode; + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + currNode = childNode; + try { + childNode.process(key, value); + } finally { + currNode = thisNode; + } + } + } + + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value, int childIndex) { + ProcessorNode thisNode = currNode; + ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex); + currNode = childNode; + try { + childNode.process(key, value); + } finally { + currNode = thisNode; + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java new file mode 100644 index 0000000..3fdfc82 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.RestoreFunc; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +public class MockProcessorContext implements ProcessorContext { + + private final KStreamTestDriver driver; + private final Serializer serializer; + private final Deserializer deserializer; + + private Map<String, StateStore> storeMap = new HashMap<>(); + + long timestamp = -1L; + + public MockProcessorContext(KStreamTestDriver driver, Serializer<?> serializer, Deserializer<?> deserializer) { + this.driver = driver; + this.serializer = serializer; + this.deserializer = deserializer; + } + + public void setTime(long timestamp) { + this.timestamp = timestamp; + } + + public int id() { + return -1; + } + + @Override + public boolean joinable() { + return true; + } + + @Override + public Serializer<?> keySerializer() { + return serializer; + } + + @Override + public Serializer<?> valueSerializer() { + return serializer; + } + + @Override + public Deserializer<?> keyDeserializer() { + return deserializer; + } + + @Override + public Deserializer<?> valueDeserializer() { + return deserializer; + } + + @Override + public File stateDir() { + throw new UnsupportedOperationException("stateDir() not supported."); + } + + @Override + public Metrics metrics() { + throw new UnsupportedOperationException("metrics() not supported."); + } + + @Override + public void register(StateStore store, RestoreFunc func) { + if (func != null) throw new UnsupportedOperationException("RestoreFunc not supported."); + storeMap.put(store.name(), store); + } + + @Override + public StateStore getStateStore(String name) { + return storeMap.get(name); + } + + @Override + public void schedule(long interval) { + throw new UnsupportedOperationException("schedule() not supported"); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value) { + driver.forward(key, value); + } + + @Override + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value, int childIndex) { + driver.forward(key, value, childIndex); + } + + @Override + public void commit() { + throw new UnsupportedOperationException("commit() not supported."); + } + + @Override + public String topic() { + throw new UnsupportedOperationException("topic() not supported."); + } + + @Override + public int partition() { + throw new UnsupportedOperationException("partition() not supported."); + } + + @Override + public long offset() { + throw new UnsupportedOperationException("offset() not supported."); + } + + @Override + public long timestamp() { + return this.timestamp; + } + +}
