Repository: samza Updated Branches: refs/heads/master 5e68d621a -> 9961023f7
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java new file mode 100644 index 0000000..5b3b335 --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + + +public class TestKinesisSystemFactory { + private static final String SYSTEM_FACTORY_REGEX = "systems.%s.samza.factory"; + private static final String KINESIS_SYSTEM_FACTORY = KinesisSystemFactory.class.getName(); + + @Test + public void testGetConsumer() { + String systemName = "test"; + Config config = buildKinesisConsumerConfig(systemName); + KinesisSystemFactory factory = new KinesisSystemFactory(); + MetricsRegistry metricsRegistry = new NoOpMetricsRegistry(); + Assert.assertNotSame(factory.getConsumer("test", config, metricsRegistry), factory.getAdmin(systemName, config)); + } + + @Ignore + @Test(expected = ConfigException.class) + public void testGetAdminWithIncorrectSspGrouper() { + String systemName = "test"; + KinesisSystemFactory factory = new KinesisSystemFactory(); + Config config = buildKinesisConsumerConfig(systemName, + "org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory"); + factory.getAdmin(systemName, config); + } + + @Ignore + @Test(expected = ConfigException.class) + public void testGetAdminWithBroadcastStreams() { + String systemName = "test"; + KinesisSystemFactory factory = new KinesisSystemFactory(); + Config config = buildKinesisConsumerConfig(systemName, + "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", + "test.stream#0"); + factory.getAdmin(systemName, config); + } + + @Ignore + @Test(expected = ConfigException.class) + public void testGetAdminWithBootstrapStream() { + String systemName = "test"; + KinesisSystemFactory factory = new KinesisSystemFactory(); + Config config = buildKinesisConsumerConfig(systemName, + "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", + null, + "kinesis-stream" + ); + factory.getAdmin(systemName, config); + } + + private static Config buildKinesisConsumerConfig(String systemName) { + return buildKinesisConsumerConfig(systemName, AllSspToSingleTaskGrouperFactory.class.getCanonicalName()); + } + + private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory) { + return buildKinesisConsumerConfig(systemName, sspGrouperFactory, null); + } + + private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory, + String broadcastStreamConfigValue) { + return buildKinesisConsumerConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue, null); + } + + private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory, + String broadcastStreamConfigValue, String bootstrapStreamName) { + Map<String, String> props = buildSamzaKinesisSystemConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue, + bootstrapStreamName); + return new MapConfig(props); + } + + private static Map<String, String> buildSamzaKinesisSystemConfig(String systemName, String sspGrouperFactory, + String broadcastStreamConfigValue, String bootstrapStreamName) { + Map<String, String> result = new HashMap<>(); + result.put(String.format(SYSTEM_FACTORY_REGEX, systemName), KINESIS_SYSTEM_FACTORY); + result.put("job.systemstreampartition.grouper.factory", sspGrouperFactory); + if (broadcastStreamConfigValue != null && !broadcastStreamConfigValue.isEmpty()) { + result.put("task.broadcast.inputs", broadcastStreamConfigValue); + } + if (bootstrapStreamName != null && !bootstrapStreamName.isEmpty()) { + result.put("systems." + systemName + ".streams." + bootstrapStreamName + ".samza.bootstrap", "true"); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java new file mode 100644 index 0000000..6379fcc --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.model.Record; + +import static org.mockito.Mockito.*; + + +public class TestKinesisRecordProcessor { + private static final long MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS = + KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000; + + @Test + public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException, + NoSuchFieldException, IllegalAccessException { + testLifeCycleHelper(5); + } + + @Test + public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException, + NoSuchFieldException, IllegalAccessException { + testLifeCycleHelper(0); + } + + private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException, + InvalidStateException, NoSuchFieldException, + IllegalAccessException { + String system = "kinesis"; + String stream = "stream"; + final CountDownLatch receivedShutdownLatch = new CountDownLatch(1); + final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0); + + KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() { + @Override + public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) { + receivedRecordsLatch.countDown(); + } + + @Override + public void onShutdown(SystemStreamPartition ssp) { + receivedShutdownLatch.countDown(); + } + }; + + KinesisRecordProcessor processor = + new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener); + + // Initialize the processor + ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); + InitializationInput initializationInput = + new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum); + processor.initialize(initializationInput); + + // Call processRecords on the processor + List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor); + + // Verification steps + + // Verify there is a receivedRecords call to listener. + Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0); + + if (numRecords > 0) { + // Call checkpoint on last record + processor.checkpoint(records.get(records.size() - 1).getSequenceNumber()); + } + + + // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener. + shutDownProcessor(processor, ShutdownReason.ZOMBIE); + + // Verify that the processor is shutdown. + Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0); + } + + /** + * Test the scenario where a processor instance is created for a shard and while it is processing records, it got + * re-assigned to the same consumer. This results in a new processor instance owning the shard and this instance + * could receive checkpoint calls for the records that are processed by the old processor instance. This test covers + * the scenario where the new instance receives the checkpoint call while it is done with the initialization phase and + * before it processed any records. + */ + @Test + public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException, + NoSuchFieldException, IllegalAccessException { + String system = "kinesis"; + String stream = "stream"; + final CountDownLatch receivedShutdownLatch = new CountDownLatch(1); + + KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() { + @Override + public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) { + } + + @Override + public void onShutdown(SystemStreamPartition ssp) { + receivedShutdownLatch.countDown(); + } + }; + + KinesisRecordProcessor processor = + new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener); + + // Initialize the processor + ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); + InitializationInput initializationInput = + new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum); + processor.initialize(initializationInput); + + // Call checkpoint. This checkpoint could have originally headed to the processor instance for the same shard but + // due to reassignment a new processor instance is created. + processor.checkpoint("1234567"); + + + // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener. + shutDownProcessor(processor, ShutdownReason.ZOMBIE); + + // Verify that the processor is shutdown. + Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0); + } + + @Test + public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException, + InvalidStateException, NoSuchFieldException, + IllegalAccessException { + testShutdownDuringReshardHelper(5); + } + + @Test + public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException, + InvalidStateException, NoSuchFieldException, + IllegalAccessException { + testShutdownDuringReshardHelper(0); + } + + private void testShutdownDuringReshardHelper(int numRecords) + throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, + IllegalAccessException { + String system = "kinesis"; + String stream = "stream"; + final CountDownLatch receivedShutdownLatch = new CountDownLatch(1); + final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0); + + KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() { + @Override + public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) { + receivedRecordsLatch.countDown(); + } + + @Override + public void onShutdown(SystemStreamPartition ssp) { + receivedShutdownLatch.countDown(); + } + }; + + KinesisRecordProcessor processor = + new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener); + + // Initialize the processor + ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); + InitializationInput initializationInput = + new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum); + processor.initialize(initializationInput); + + // Call processRecords on the processor + List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor); + + // Verification steps + + // Verify there is a receivedRecords call to listener. + Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0); + + // Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the + // listener until checkpoint is called for the last record consumed from shard. + new Thread(() -> shutDownProcessor(processor, ShutdownReason.TERMINATE)).start(); + + // If there are no records, the processor should shutdown immediately. + if (numRecords == 0) { + Assert.assertTrue("Unable to shutdown processor.", + receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS)); + return; + } + + Assert.assertFalse("Processor shutdown too early.", + receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS)); + + // Call checkpoint for the last but one record and the processor should still not call shutdown on listener. + processor.checkpoint(records.get(records.size() - 2).getSequenceNumber()); + Assert.assertFalse("Processor shutdown too early.", + receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS)); + + // Call checkpoint for the last record and the parent partition should be removed from mapper. + processor.checkpoint(records.get(records.size() - 1).getSequenceNumber()); + Assert.assertTrue("Unable to shutdown processor.", + receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS)); + } + + static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard, + List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException { + Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>(); + processors.forEach(processor -> { + try { + // Create records and call process records + IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); + doNothing().when(checkpointer).checkpoint(anyString()); + doNothing().when(checkpointer).checkpoint(); + ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class); + when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer); + when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L); + List<Record> inputRecords = createRecords(numRecordsPerShard); + processorRecordMap.put(processor, inputRecords); + when(processRecordsInput.getRecords()).thenReturn(inputRecords); + processor.processRecords(processRecordsInput); + } catch (ShutdownException | InvalidStateException ex) { + throw new RuntimeException(ex); + } + }); + return processorRecordMap; + } + + static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) { + try { + ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class); + when(shutdownInput.getShutdownReason()).thenReturn(reason); + when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor)); + processor.shutdown(shutdownInput); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor processor) + throws NoSuchFieldException, IllegalAccessException { + Field f = processor.getClass().getDeclaredField("checkpointer"); + f.setAccessible(true); + return (IRecordProcessorCheckpointer) f.get(processor); + } + + private static List<Record> createRecords(int numRecords) { + List<Record> records = new ArrayList<>(numRecords); + Random rand = new Random(); + + for (int i = 0; i < numRecords; i++) { + String dataStr = "testData-" + System.currentTimeMillis(); + ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8)); + String key = String.format("partitionKey-%d", rand.nextLong()); + String seqNum = String.format("%04d", 5 * i + 1); + Record record = new Record() + .withData(data) + .withPartitionKey(key) + .withSequenceNumber(seqNum) + .withApproximateArrivalTimestamp(new Date()); + records.add(record); + } + return records; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java new file mode 100644 index 0000000..ade02ac --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; + +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.model.Record; + +import org.apache.samza.system.kinesis.KinesisConfig; +import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics; + +import static org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*; +import static org.mockito.Mockito.*; + + +/** + * These class of tests test KinesisSystemConsumer and KinesisRecordProcessor together. + */ +public class TestKinesisSystemConsumer { + private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string + + @Test + public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException, + NoSuchFieldException, IllegalAccessException { + String system = "kinesis"; + String stream = "stream"; + int numShards = 2; + int numRecordsPerShard = 5; + + testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard); + } + + @Test + public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException, + InvalidStateException, NoSuchFieldException, + IllegalAccessException { + String system = "kinesis"; + String stream = "stream"; + int numShards = 1; + int numRecordsPerShard = 0; + + testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard); + } + + /** + * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards + * 1. Creation of record processors. + * 2. Initialization of record processors. + * 3. Processing records via record processors. + * 4. Calling checkpoint on record processors. + * 5. Shutting down (due to re-assignment or lease expiration) record processors. + */ + private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard) + throws InterruptedException, ShutdownException, InvalidStateException, + NoSuchFieldException, IllegalAccessException { + + KinesisConfig kConfig = new KinesisConfig(new MapConfig()); + // Create consumer + KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry()); + initializeMetrics(consumer, stream); + + List<SystemStreamPartition> ssps = new LinkedList<>(); + IntStream.range(0, numShards) + .forEach(p -> { + SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p)); + ssps.add(ssp); + }); + ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET)); + + // Create Kinesis record processor factory + IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream); + + // Create and initialize Kinesis record processor + Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards); + List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values()); + + // Generate records to Kinesis record processor + Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList); + + // Verification steps + + // Read events from the BEM queue + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = + readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard); + if (numRecordsPerShard > 0) { + Assert.assertEquals(messages.size(), numShards); + } else { + // No input records and hence no messages + Assert.assertEquals(messages.size(), 0); + return; + } + + Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer); + ssps.forEach(ssp -> { + try { + KinesisRecordProcessor processor = sspToProcessorMap.get(ssp); + + if (numRecordsPerShard > 0) { + // Verify that the read messages are received in order and are the same as input records + Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard); + List<IncomingMessageEnvelope> envelopes = messages.get(ssp); + List<Record> inputRecords = inputRecordMap.get(processor); + verifyRecords(envelopes, inputRecords, processor.getShardId()); + + // Call checkpoint on consumer and verify that the checkpoint is called with the right offset + IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1); + consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset())); + ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); + verify(getCheckpointer(processor)).checkpoint(argument.capture()); + Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue()); + } + + // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping + shutDownProcessor(processor, ShutdownReason.ZOMBIE); + Assert.assertTrue(!sspToProcessorMap.containsValue(processor)); + Assert.assertTrue(isSspAvailable(consumer, ssp)); + } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) { + throw new RuntimeException(ex); + } + }); + } + + private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) { + Map<String, KinesisRecordProcessor> processorMap = new HashMap<>(); + IntStream.range(0, numShards) + .forEach(p -> { + String shardId = String.format("shard-%05d", p); + // Create Kinesis processor + KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor(); + + // Initialize the shard + ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); + InitializationInput initializationInput = + new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum); + processor.initialize(initializationInput); + processorMap.put(shardId, processor); + }); + return processorMap; + } + + private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps, + KinesisSystemConsumer consumer, int numEvents) throws InterruptedException { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>(); + int totalEventsConsumed = 0; + + while (totalEventsConsumed < numEvents) { + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages = + consumer.poll(ssps, Duration.ofSeconds(1).toMillis()); + receivedMessages.forEach((key, value) -> { + if (messages.containsKey(key)) { + messages.get(key).addAll(value); + } else { + messages.put(key, new ArrayList<>(value)); + } + }); + totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum(); + } + + if (totalEventsConsumed < numEvents) { + String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents); + throw new SamzaException(msg); + } + return messages; + } + + private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) { + Iterator outputRecordsIter = outputRecords.iterator(); + inputRecords.forEach(record -> { + IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next(); + String outputKey = (String) envelope.getKey(); + KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope; + Assert.assertEquals(outputKey, record.getPartitionKey()); + Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber()); + Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(), + record.getApproximateArrivalTimestamp()); + Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId); + ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage()); + record.getData().rewind(); + Assert.assertTrue(outputData.equals(record.getData())); + verifyOffset(envelope.getOffset(), record, shardId); + }); + } + + private void verifyOffset(String offset, Record inputRecord, String shardId) { + KinesisSystemConsumerOffset ckpt = KinesisSystemConsumerOffset.parse(offset); + Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber()); + Assert.assertEquals(ckpt.getShardId(), shardId); + } + + @SuppressWarnings("unchecked") + private void initializeMetrics(KinesisSystemConsumer consumer, String stream) + throws NoSuchFieldException, IllegalAccessException { + Field f = consumer.getClass().getDeclaredField("metrics"); + f.setAccessible(true); + KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) f.get(consumer); + metrics.initializeMetrics(Collections.singleton(stream)); + } + + @SuppressWarnings("unchecked") + private Map<SystemStreamPartition, KinesisRecordProcessor> getProcessorMap(KinesisSystemConsumer consumer) + throws NoSuchFieldException, IllegalAccessException { + Field f = consumer.getClass().getDeclaredField("processors"); + f.setAccessible(true); + return (Map<SystemStreamPartition, KinesisRecordProcessor>) f.get(consumer); + } + + @SuppressWarnings("unchecked") + private boolean isSspAvailable(KinesisSystemConsumer consumer, SystemStreamPartition ssp) + throws NoSuchFieldException, IllegalAccessException { + SSPAllocator sspAllocator = getSspAllocator(consumer); + Field f = sspAllocator.getClass().getDeclaredField("availableSsps"); + f.setAccessible(true); + Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get( + sspAllocator); + return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp); + } + + private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer) + throws NoSuchFieldException, IllegalAccessException { + Field f = consumer.getClass().getDeclaredField("sspAllocator"); + f.setAccessible(true); + return (SSPAllocator) f.get(consumer); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java new file mode 100644 index 0000000..615a06e --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import org.junit.Assert; +import org.junit.Test; + + +public class TestKinesisSystemConsumerOffset { + @Test + public void testEquality() { + KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456"); + KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt.toString()); + Assert.assertEquals(inCkpt, outCkpt); + } + + @Test + public void testInEquality() { + KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456"); + + // With different shardId + KinesisSystemConsumerOffset inCkpt1 = new KinesisSystemConsumerOffset("shard-00001", "123456"); + KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString()); + Assert.assertTrue(!inCkpt.equals(outCkpt)); + + // With different seqNumber + inCkpt1 = new KinesisSystemConsumerOffset("shard-00000", "123457"); + outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString()); + Assert.assertTrue(!inCkpt.equals(outCkpt)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java new file mode 100644 index 0000000..0533a29 --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; +import org.apache.samza.Partition; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; +import org.junit.Test; + + +public class TestSSPAllocator { + @Test + public void testAllocateAndFree() throws NoAvailablePartitionException, NoSuchFieldException, IllegalAccessException { + int numPartitions = 2; + String system = "kinesis"; + String stream = "stream"; + List<SystemStreamPartition> ssps = new ArrayList<>(); + IntStream.range(0, numPartitions) + .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i)))); + + SSPAllocator allocator = new SSPAllocator(); + ssps.forEach(allocator::free); + + Assert.assertTrue(isSspAvailable(allocator, ssps.get(0))); + Assert.assertTrue(isSspAvailable(allocator, ssps.get(1))); + + SystemStreamPartition ssp = allocator.allocate(stream); + Assert.assertFalse(isSspAvailable(allocator, ssps.get(0))); + Assert.assertTrue(isSspAvailable(allocator, ssps.get(1))); + Assert.assertEquals(ssp, ssps.get(0)); + + ssp = allocator.allocate(stream); + Assert.assertFalse(isSspAvailable(allocator, ssps.get(0))); + Assert.assertFalse(isSspAvailable(allocator, ssps.get(1))); + Assert.assertEquals(ssp, ssps.get(1)); + + allocator.free(ssps.get(1)); + Assert.assertFalse(isSspAvailable(allocator, ssps.get(0))); + Assert.assertTrue(isSspAvailable(allocator, ssps.get(1))); + + allocator.free(ssps.get(0)); + Assert.assertTrue(isSspAvailable(allocator, ssps.get(0))); + Assert.assertTrue(isSspAvailable(allocator, ssps.get(1))); + } + + @Test (expected = NoAvailablePartitionException.class) + public void testAssignMoreThanMaxPartitions() throws NoAvailablePartitionException { + int numPartitions = 2; + String system = "kinesis"; + String stream = "stream"; + List<SystemStreamPartition> ssps = new ArrayList<>(); + IntStream.range(0, numPartitions) + .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i)))); + + SSPAllocator allocator = new SSPAllocator(); + ssps.forEach(allocator::free); + + allocator.allocate(stream); + allocator.allocate(stream); + allocator.allocate(stream); // An exception should be thrown at this point. + } + + @Test (expected = IllegalArgumentException.class) + public void testFreeSameSspTwice() throws NoAvailablePartitionException { + int numPartitions = 2; + String system = "kinesis"; + String stream = "stream"; + List<SystemStreamPartition> ssps = new ArrayList<>(); + IntStream.range(0, numPartitions) + .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i)))); + + SSPAllocator allocator = new SSPAllocator(); + ssps.forEach(allocator::free); + + SystemStreamPartition ssp = allocator.allocate(stream); + allocator.free(ssp); + allocator.free(ssp); // An exception should be thrown at this point. + } + + @Test (expected = IllegalArgumentException.class) + public void testFreeUnallocatedSsp() throws NoAvailablePartitionException { + int numPartitions = 2; + String system = "kinesis"; + String stream = "stream"; + List<SystemStreamPartition> ssps = new ArrayList<>(); + IntStream.range(0, numPartitions) + .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i)))); + + SSPAllocator allocator = new SSPAllocator(); + ssps.forEach(allocator::free); + + allocator.allocate(stream); + allocator.free(ssps.get(1)); // An exception should be thrown at this point. + } + + @SuppressWarnings("unchecked") + private boolean isSspAvailable(SSPAllocator sspAllocator, SystemStreamPartition ssp) throws NoSuchFieldException, IllegalAccessException { + Field f = sspAllocator.getClass().getDeclaredField("availableSsps"); + f.setAccessible(true); + Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get( + sspAllocator); + return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 0fe3dfa..e50e816 100644 --- a/settings.gradle +++ b/settings.gradle @@ -24,9 +24,8 @@ include \ 'samza-rest', 'samza-shell', 'samza-azure', - 'samza-sql' - - + 'samza-sql', + 'samza-aws' def scalaModules = [ 'samza-core',