Repository: samza
Updated Branches:
  refs/heads/master 5e68d621a -> 9961023f7


http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
new file mode 100644
index 0000000..5b3b335
--- /dev/null
+++ 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import 
org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestKinesisSystemFactory {
+  private static final String SYSTEM_FACTORY_REGEX = 
"systems.%s.samza.factory";
+  private static final String KINESIS_SYSTEM_FACTORY = 
KinesisSystemFactory.class.getName();
+
+  @Test
+  public void testGetConsumer() {
+    String systemName = "test";
+    Config config = buildKinesisConsumerConfig(systemName);
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    MetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    Assert.assertNotSame(factory.getConsumer("test", config, metricsRegistry), 
factory.getAdmin(systemName, config));
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithIncorrectSspGrouper() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        
"org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBroadcastStreams() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        "test.stream#0");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBootstrapStream() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        null,
+        "kinesis-stream"
+        );
+    factory.getAdmin(systemName, config);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName) {
+    return buildKinesisConsumerConfig(systemName, 
AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String 
sspGrouperFactory) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String 
sspGrouperFactory,
+      String broadcastStreamConfigValue) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, 
broadcastStreamConfigValue, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String 
sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> props = buildSamzaKinesisSystemConfig(systemName, 
sspGrouperFactory, broadcastStreamConfigValue,
+        bootstrapStreamName);
+    return new MapConfig(props);
+  }
+
+  private static Map<String, String> buildSamzaKinesisSystemConfig(String 
systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> result = new HashMap<>();
+    result.put(String.format(SYSTEM_FACTORY_REGEX, systemName), 
KINESIS_SYSTEM_FACTORY);
+    result.put("job.systemstreampartition.grouper.factory", sspGrouperFactory);
+    if (broadcastStreamConfigValue != null && 
!broadcastStreamConfigValue.isEmpty()) {
+      result.put("task.broadcast.inputs", broadcastStreamConfigValue);
+    }
+    if (bootstrapStreamName != null && !bootstrapStreamName.isEmpty()) {
+      result.put("systems." + systemName + ".streams." + bootstrapStreamName + 
".samza.bootstrap", "true");
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
new file mode 100644
index 0000000..6379fcc
--- /dev/null
+++ 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import 
com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestKinesisRecordProcessor {
+  private static final long MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS =
+      KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 
1000;
+
+  @Test
+  public void testLifeCycleWithEvents() throws InterruptedException, 
ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, 
IllegalAccessException {
+    testLifeCycleHelper(5);
+  }
+
+  @Test
+  public void testLifeCycleWithNoEvents() throws InterruptedException, 
ShutdownException, InvalidStateException,
+                                                 NoSuchFieldException, 
IllegalAccessException {
+    testLifeCycleHelper(0);
+  }
+
+  private void testLifeCycleHelper(int numRecords) throws 
InterruptedException, ShutdownException,
+                                                          
InvalidStateException, NoSuchFieldException,
+                                                          
IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords 
> 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new 
KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> 
records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, 
new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new 
InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, 
Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", 
receivedRecordsLatch.getCount() == 0);
+
+    if (numRecords > 0) {
+      // Call checkpoint on last record
+      processor.checkpoint(records.get(records.size() - 
1).getSequenceNumber());
+    }
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the 
processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", 
receivedShutdownLatch.getCount() == 0);
+  }
+
+  /**
+   * Test the scenario where a processor instance is created for a shard and 
while it is processing records, it got
+   * re-assigned to the same consumer. This results in a new processor 
instance owning the shard and this instance
+   * could receive checkpoint calls for the records that are processed by the 
old processor instance. This test covers
+   * the scenario where the new instance receives the checkpoint call while it 
is done with the initialization phase and
+   * before it processed any records.
+   */
+  @Test
+  public void testCheckpointAfterInit() throws InterruptedException, 
ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, 
IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+
+    KinesisRecordProcessorListener listener = new 
KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> 
records, long millisBehindLatest) {
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, 
new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new 
InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call checkpoint. This checkpoint could have originally headed to the 
processor instance for the same shard but
+    // due to reassignment a new processor instance is created.
+    processor.checkpoint("1234567");
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the 
processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", 
receivedShutdownLatch.getCount() == 0);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithEvents() throws 
InterruptedException, ShutdownException,
+                                                           
InvalidStateException, NoSuchFieldException,
+                                                           
IllegalAccessException {
+    testShutdownDuringReshardHelper(5);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithNoEvents() throws 
InterruptedException, ShutdownException,
+                                                             
InvalidStateException, NoSuchFieldException,
+                                                             
IllegalAccessException {
+    testShutdownDuringReshardHelper(0);
+  }
+
+  private void testShutdownDuringReshardHelper(int numRecords)
+      throws InterruptedException, ShutdownException, InvalidStateException, 
NoSuchFieldException,
+             IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords 
> 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new 
KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> 
records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, 
new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new 
InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, 
Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", 
receivedRecordsLatch.getCount() == 0);
+
+    // Call shutdown (with TERMINATE reason) on processor and verify that the 
processor does not call shutdown on the
+    // listener until checkpoint is called for the last record consumed from 
shard.
+    new Thread(() -> shutDownProcessor(processor, 
ShutdownReason.TERMINATE)).start();
+
+    // If there are no records, the processor should shutdown immediately.
+    if (numRecords == 0) {
+      Assert.assertTrue("Unable to shutdown processor.",
+          receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, 
TimeUnit.MILLISECONDS));
+      return;
+    }
+
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, 
TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last but one record and the processor should 
still not call shutdown on listener.
+    processor.checkpoint(records.get(records.size() - 2).getSequenceNumber());
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, 
TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last record and the parent partition should be 
removed from mapper.
+    processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+    Assert.assertTrue("Unable to shutdown processor.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, 
TimeUnit.MILLISECONDS));
+  }
+
+  static Map<KinesisRecordProcessor, List<Record>> generateRecords(int 
numRecordsPerShard,
+      List<KinesisRecordProcessor> processors) throws ShutdownException, 
InvalidStateException {
+    Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new 
HashMap<>();
+    processors.forEach(processor -> {
+        try {
+          // Create records and call process records
+          IRecordProcessorCheckpointer checkpointer = 
Mockito.mock(IRecordProcessorCheckpointer.class);
+          doNothing().when(checkpointer).checkpoint(anyString());
+          doNothing().when(checkpointer).checkpoint();
+          ProcessRecordsInput processRecordsInput = 
Mockito.mock(ProcessRecordsInput.class);
+          when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+          when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+          List<Record> inputRecords = createRecords(numRecordsPerShard);
+          processorRecordMap.put(processor, inputRecords);
+          when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+          processor.processRecords(processRecordsInput);
+        } catch (ShutdownException | InvalidStateException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+    return processorRecordMap;
+  }
+
+  static void shutDownProcessor(KinesisRecordProcessor processor, 
ShutdownReason reason) {
+    try {
+      ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
+      when(shutdownInput.getShutdownReason()).thenReturn(reason);
+      
when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
+      processor.shutdown(shutdownInput);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor 
processor)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = processor.getClass().getDeclaredField("checkpointer");
+    f.setAccessible(true);
+    return (IRecordProcessorCheckpointer) f.get(processor);
+  }
+
+  private static List<Record> createRecords(int numRecords) {
+    List<Record> records = new ArrayList<>(numRecords);
+    Random rand = new Random();
+
+    for (int i = 0; i < numRecords; i++) {
+      String dataStr = "testData-" + System.currentTimeMillis();
+      ByteBuffer data = 
ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8));
+      String key = String.format("partitionKey-%d", rand.nextLong());
+      String seqNum = String.format("%04d", 5 * i + 1);
+      Record record = new Record()
+          .withData(data)
+          .withPartitionKey(key)
+          .withSequenceNumber(seqNum)
+          .withApproximateArrivalTimestamp(new Date());
+      records.add(record);
+    }
+    return records;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
new file mode 100644
index 0000000..ade02ac
--- /dev/null
+++ 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import 
com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+
+import static 
org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * These class of tests test KinesisSystemConsumer and KinesisRecordProcessor 
together.
+ */
+public class TestKinesisSystemConsumer {
+  private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // 
Could be any string
+
+  @Test
+  public void testProcessRecords() throws InterruptedException, 
ShutdownException, InvalidStateException,
+                                          NoSuchFieldException, 
IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 2;
+    int numRecordsPerShard = 5;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  @Test
+  public void testProcessRecordsWithEmptyRecordList() throws 
InterruptedException, ShutdownException,
+                                                             
InvalidStateException, NoSuchFieldException,
+                                                             
IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 1;
+    int numRecordsPerShard = 0;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  /**
+   * Helper to simulate and test the life-cycle of record processing from a 
kinesis stream with a given number of shards
+   * 1. Creation of record processors.
+   * 2. Initialization of record processors.
+   * 3. Processing records via record processors.
+   * 4. Calling checkpoint on record processors.
+   * 5. Shutting down (due to re-assignment or lease expiration) record 
processors.
+   */
+  private void testProcessRecordsHelper(String system, String stream, int 
numShards, int numRecordsPerShard)
+      throws InterruptedException, ShutdownException, InvalidStateException,
+             NoSuchFieldException, IllegalAccessException {
+
+    KinesisConfig kConfig = new KinesisConfig(new MapConfig());
+    // Create consumer
+    KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, 
kConfig, new NoOpMetricsRegistry());
+    initializeMetrics(consumer, stream);
+
+    List<SystemStreamPartition> ssps = new LinkedList<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            SystemStreamPartition ssp = new SystemStreamPartition(system, 
stream, new Partition(p));
+            ssps.add(ssp);
+          });
+    ssps.forEach(ssp -> consumer.register(ssp, 
SYSTEM_CONSUMER_REGISTER_OFFSET));
+
+    // Create Kinesis record processor factory
+    IRecordProcessorFactory factory = 
consumer.createRecordProcessorFactory(stream);
+
+    // Create and initialize Kinesis record processor
+    Map<String, KinesisRecordProcessor> processorMap = 
createAndInitProcessors(factory, numShards);
+    List<KinesisRecordProcessor> processorList = new 
ArrayList<>(processorMap.values());
+
+    // Generate records to Kinesis record processor
+    Map<KinesisRecordProcessor, List<Record>> inputRecordMap = 
generateRecords(numRecordsPerShard, processorList);
+
+    // Verification steps
+
+    // Read events from the BEM queue
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
+        readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
+    if (numRecordsPerShard > 0) {
+      Assert.assertEquals(messages.size(), numShards);
+    } else {
+      // No input records and hence no messages
+      Assert.assertEquals(messages.size(), 0);
+      return;
+    }
+
+    Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = 
getProcessorMap(consumer);
+    ssps.forEach(ssp -> {
+        try {
+          KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+
+          if (numRecordsPerShard > 0) {
+            // Verify that the read messages are received in order and are the 
same as input records
+            Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+            List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+            List<Record> inputRecords = inputRecordMap.get(processor);
+            verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+            // Call checkpoint on consumer and verify that the checkpoint is 
called with the right offset
+            IncomingMessageEnvelope lastEnvelope = 
envelopes.get(envelopes.size() - 1);
+            consumer.onCheckpoint(Collections.singletonMap(ssp, 
lastEnvelope.getOffset()));
+            ArgumentCaptor<String> argument = 
ArgumentCaptor.forClass(String.class);
+            verify(getCheckpointer(processor)).checkpoint(argument.capture());
+            Assert.assertEquals(inputRecords.get(inputRecords.size() - 
1).getSequenceNumber(), argument.getValue());
+          }
+
+          // Call shutdown (with ZOMBIE reason) on processor and verify if 
shutdown freed the ssp mapping
+          shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+          Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+          Assert.assertTrue(isSspAvailable(consumer, ssp));
+        } catch (NoSuchFieldException | IllegalAccessException | 
InvalidStateException | ShutdownException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+  }
+
+  private Map<String, KinesisRecordProcessor> 
createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
+    Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            String shardId = String.format("shard-%05d", p);
+            // Create Kinesis processor
+            KinesisRecordProcessor processor = (KinesisRecordProcessor) 
factory.createProcessor();
+
+            // Initialize the shard
+            ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+            InitializationInput initializationInput =
+                new 
InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+            processor.initialize(initializationInput);
+            processorMap.put(shardId, processor);
+          });
+    return processorMap;
+  }
+
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
readEvents(Set<SystemStreamPartition> ssps,
+      KinesisSystemConsumer consumer, int numEvents) throws 
InterruptedException {
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new 
HashMap<>();
+    int totalEventsConsumed = 0;
+
+    while (totalEventsConsumed < numEvents) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> 
receivedMessages =
+          consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
+      receivedMessages.forEach((key, value) -> {
+          if (messages.containsKey(key)) {
+            messages.get(key).addAll(value);
+          } else {
+            messages.put(key, new ArrayList<>(value));
+          }
+        });
+      totalEventsConsumed = 
messages.values().stream().mapToInt(List::size).sum();
+    }
+
+    if (totalEventsConsumed < numEvents) {
+      String msg = String.format("Received only %d of %d events", 
totalEventsConsumed, numEvents);
+      throw new SamzaException(msg);
+    }
+    return messages;
+  }
+
+  private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, 
List<Record> inputRecords, String shardId) {
+    Iterator outputRecordsIter = outputRecords.iterator();
+    inputRecords.forEach(record -> {
+        IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) 
outputRecordsIter.next();
+        String outputKey = (String) envelope.getKey();
+        KinesisIncomingMessageEnvelope kinesisMessageEnvelope = 
(KinesisIncomingMessageEnvelope) envelope;
+        Assert.assertEquals(outputKey, record.getPartitionKey());
+        Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), 
record.getSequenceNumber());
+        
Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+            record.getApproximateArrivalTimestamp());
+        Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+        ByteBuffer outputData = ByteBuffer.wrap((byte[]) 
kinesisMessageEnvelope.getMessage());
+        record.getData().rewind();
+        Assert.assertTrue(outputData.equals(record.getData()));
+        verifyOffset(envelope.getOffset(), record, shardId);
+      });
+  }
+
+  private void verifyOffset(String offset, Record inputRecord, String shardId) 
{
+    KinesisSystemConsumerOffset ckpt = 
KinesisSystemConsumerOffset.parse(offset);
+    Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber());
+    Assert.assertEquals(ckpt.getShardId(), shardId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initializeMetrics(KinesisSystemConsumer consumer, String stream)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("metrics");
+    f.setAccessible(true);
+    KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) 
f.get(consumer);
+    metrics.initializeMetrics(Collections.singleton(stream));
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<SystemStreamPartition, KinesisRecordProcessor> 
getProcessorMap(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("processors");
+    f.setAccessible(true);
+    return (Map<SystemStreamPartition, KinesisRecordProcessor>) 
f.get(consumer);
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(KinesisSystemConsumer consumer, 
SystemStreamPartition ssp)
+      throws NoSuchFieldException, IllegalAccessException {
+    SSPAllocator sspAllocator = getSspAllocator(consumer);
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, 
Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && 
availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+
+  private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("sspAllocator");
+    f.setAccessible(true);
+    return (SSPAllocator) f.get(consumer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git 
a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..615a06e
--- /dev/null
+++ 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemConsumerOffset {
+  @Test
+  public void testEquality() {
+    KinesisSystemConsumerOffset inCkpt = new 
KinesisSystemConsumerOffset("shard-00000", "123456");
+    KinesisSystemConsumerOffset outCkpt = 
KinesisSystemConsumerOffset.parse(inCkpt.toString());
+    Assert.assertEquals(inCkpt, outCkpt);
+  }
+
+  @Test
+  public void testInEquality() {
+    KinesisSystemConsumerOffset inCkpt = new 
KinesisSystemConsumerOffset("shard-00000", "123456");
+
+    // With different shardId
+    KinesisSystemConsumerOffset inCkpt1 = new 
KinesisSystemConsumerOffset("shard-00001", "123456");
+    KinesisSystemConsumerOffset outCkpt = 
KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+
+    // With different seqNumber
+    inCkpt1 = new KinesisSystemConsumerOffset("shard-00000", "123457");
+    outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
new file mode 100644
index 0000000..0533a29
--- /dev/null
+++ 
b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSSPAllocator {
+  @Test
+  public void testAllocateAndFree() throws NoAvailablePartitionException, 
NoSuchFieldException, IllegalAccessException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new 
Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(0));
+
+    ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(1));
+
+    allocator.free(ssps.get(1));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    allocator.free(ssps.get(0));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+  }
+
+  @Test (expected = NoAvailablePartitionException.class)
+  public void testAssignMoreThanMaxPartitions() throws 
NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new 
Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.allocate(stream);
+    allocator.allocate(stream); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeSameSspTwice() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new 
Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    allocator.free(ssp);
+    allocator.free(ssp); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeUnallocatedSsp() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new 
Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.free(ssps.get(1)); // An exception should be thrown at this 
point.
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(SSPAllocator sspAllocator, 
SystemStreamPartition ssp) throws NoSuchFieldException, IllegalAccessException {
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, 
Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && 
availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0fe3dfa..e50e816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -24,9 +24,8 @@ include \
   'samza-rest',
   'samza-shell',
   'samza-azure',
-  'samza-sql'
-
-
+  'samza-sql',
+  'samza-aws'
 
 def scalaModules = [
         'samza-core',

Reply via email to