http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java new file mode 100644 index 0000000..7ca8e0b --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import static com.google.common.collect.Lists.newArrayList; +import static com.google.common.collect.Lists.transform; +import com.google.common.base.Function; + +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.ResponseMetadata; +import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest; +import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; +import com.amazonaws.services.kinesis.model.CreateStreamRequest; +import com.amazonaws.services.kinesis.model.CreateStreamResult; +import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest; +import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult; +import com.amazonaws.services.kinesis.model.DeleteStreamRequest; +import com.amazonaws.services.kinesis.model.DeleteStreamResult; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest; +import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult; +import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest; +import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest; +import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult; +import com.amazonaws.services.kinesis.model.ListStreamsRequest; +import com.amazonaws.services.kinesis.model.ListStreamsResult; +import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest; +import com.amazonaws.services.kinesis.model.ListTagsForStreamResult; +import com.amazonaws.services.kinesis.model.MergeShardsRequest; +import com.amazonaws.services.kinesis.model.MergeShardsResult; +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest; +import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.SplitShardRequest; +import com.amazonaws.services.kinesis.model.SplitShardResult; +import com.amazonaws.services.kinesis.model.StreamDescription; +import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.joda.time.Instant; +import static java.lang.Integer.parseInt; +import static java.lang.Math.min; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; +import javax.annotation.Nullable; + +/** + * Created by p.pastuszka on 21.07.2016. + */ +class AmazonKinesisMock implements AmazonKinesis { + + static class TestData implements Serializable { + private final String data; + private final Instant arrivalTimestamp; + private final String sequenceNumber; + + public TestData(KinesisRecord record) { + this(new String(record.getData().array()), + record.getApproximateArrivalTimestamp(), + record.getSequenceNumber()); + } + + public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) { + this.data = data; + this.arrivalTimestamp = arrivalTimestamp; + this.sequenceNumber = sequenceNumber; + } + + public Record convertToRecord() { + return new Record(). + withApproximateArrivalTimestamp(arrivalTimestamp.toDate()). + withData(ByteBuffer.wrap(data.getBytes())). + withSequenceNumber(sequenceNumber). + withPartitionKey(""); + } + + @Override + public boolean equals(Object obj) { + return EqualsBuilder.reflectionEquals(this, obj); + } + + @Override + public int hashCode() { + return reflectionHashCode(this); + } + } + + static class Provider implements KinesisClientProvider { + + private final List<List<TestData>> shardedData; + private final int numberOfRecordsPerGet; + + public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) { + this.shardedData = shardedData; + this.numberOfRecordsPerGet = numberOfRecordsPerGet; + } + + @Override + public AmazonKinesis get() { + return new AmazonKinesisMock(transform(shardedData, + new Function<List<TestData>, List<Record>>() { + @Override + public List<Record> apply(@Nullable List<TestData> testDatas) { + return transform(testDatas, new Function<TestData, Record>() { + @Override + public Record apply(@Nullable TestData testData) { + return testData.convertToRecord(); + } + }); + } + }), numberOfRecordsPerGet); + } + } + + private final List<List<Record>> shardedData; + private final int numberOfRecordsPerGet; + + public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) { + this.shardedData = shardedData; + this.numberOfRecordsPerGet = numberOfRecordsPerGet; + } + + @Override + public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { + String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); + int shardId = parseInt(shardIteratorParts[0]); + int startingRecord = parseInt(shardIteratorParts[1]); + List<Record> shardData = shardedData.get(shardId); + + int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); + int fromIndex = min(startingRecord, toIndex); + return new GetRecordsResult(). + withRecords(shardData.subList(fromIndex, toIndex)). + withNextShardIterator(String.format("%s:%s", shardId, toIndex)); + } + + @Override + public GetShardIteratorResult getShardIterator( + GetShardIteratorRequest getShardIteratorRequest) { + ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( + getShardIteratorRequest.getShardIteratorType()); + + String shardIterator; + if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { + shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); + } else { + throw new RuntimeException("Not implemented"); + } + + return new GetShardIteratorResult().withShardIterator(shardIterator); + } + + @Override + public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) { + int nextShardId = 0; + if (exclusiveStartShardId != null) { + nextShardId = parseInt(exclusiveStartShardId) + 1; + } + boolean hasMoreShards = nextShardId + 1 < shardedData.size(); + + List<Shard> shards = newArrayList(); + if (nextShardId < shardedData.size()) { + shards.add(new Shard().withShardId(Integer.toString(nextShardId))); + } + + return new DescribeStreamResult().withStreamDescription( + new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards) + ); + } + + @Override + public void setEndpoint(String endpoint) { + + } + + @Override + public void setRegion(Region region) { + + } + + @Override + public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public CreateStreamResult createStream(String streamName, Integer shardCount) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod( + DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DeleteStreamResult deleteStream(String streamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(String streamName) { + + throw new RuntimeException("Not implemented"); + } + + @Override + public DescribeStreamResult describeStream(String streamName, + Integer limit, String exclusiveStartShardId) { + throw new RuntimeException("Not implemented"); + } + + @Override + public DisableEnhancedMonitoringResult disableEnhancedMonitoring( + DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public EnableEnhancedMonitoringResult enableEnhancedMonitoring( + EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public GetShardIteratorResult getShardIterator(String streamName, + String shardId, + String shardIteratorType) { + throw new RuntimeException("Not implemented"); + } + + @Override + public GetShardIteratorResult getShardIterator(String streamName, + String shardId, + String shardIteratorType, + String startingSequenceNumber) { + throw new RuntimeException("Not implemented"); + } + + @Override + public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod( + IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams() { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(String exclusiveStartStreamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) { + throw new RuntimeException("Not implemented"); + } + + @Override + public ListTagsForStreamResult listTagsForStream( + ListTagsForStreamRequest listTagsForStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public MergeShardsResult mergeShards(String streamName, + String shardToMerge, String adjacentShardToMerge) { + throw new RuntimeException("Not implemented"); + } + + @Override + public PutRecordResult putRecord(PutRecordRequest putRecordRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) { + throw new RuntimeException("Not implemented"); + } + + @Override + public PutRecordResult putRecord(String streamName, ByteBuffer data, + String partitionKey, String sequenceNumberForOrdering) { + throw new RuntimeException("Not implemented"); + } + + @Override + public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public RemoveTagsFromStreamResult removeTagsFromStream( + RemoveTagsFromStreamRequest removeTagsFromStreamRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public SplitShardResult splitShard(SplitShardRequest splitShardRequest) { + throw new RuntimeException("Not implemented"); + } + + @Override + public SplitShardResult splitShard(String streamName, + String shardToSplit, String newStartingHashKey) { + throw new RuntimeException("Not implemented"); + } + + @Override + public void shutdown() { + + } + + @Override + public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { + throw new RuntimeException("Not implemented"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java new file mode 100644 index 0000000..152fd6d --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import org.junit.Test; +import java.util.NoSuchElementException; + +/** + * Created by ppastuszka on 12.12.15. + */ +public class CustomOptionalTest { + @Test(expected = NoSuchElementException.class) + public void absentThrowsNoSuchElementExceptionOnGet() { + CustomOptional.absent().get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java new file mode 100644 index 0000000..a9e5a69 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.model.Shard; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import static java.util.Arrays.asList; + + +/*** + */ +@RunWith(MockitoJUnitRunner.class) +public class DynamicCheckpointGeneratorTest { + + @Mock + private SimplifiedKinesisClient kinesisClient; + @Mock + private Shard shard1, shard2, shard3; + + @Test + public void shouldMapAllShardsToCheckpoints() throws Exception { + given(shard1.getShardId()).willReturn("shard-01"); + given(shard2.getShardId()).willReturn("shard-02"); + given(shard3.getShardId()).willReturn("shard-03"); + given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3)); + + StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); + DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream", + startingPoint); + + KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); + + assertThat(checkpoint).hasSize(3); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java new file mode 100644 index 0000000..61a858f --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import static com.google.common.collect.Lists.newArrayList; + +import com.google.common.collect.Iterables; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import org.joda.time.DateTime; +import org.junit.Test; +import java.util.List; + +/** + * Created by p.pastuszka on 22.07.2016. + */ +public class KinesisMockReadTest { + @Test + public void readsDataFromMockKinesis() { + int noOfShards = 3; + int noOfEventsPerShard = 100; + List<List<AmazonKinesisMock.TestData>> testData = + provideTestData(noOfShards, noOfEventsPerShard); + + final Pipeline p = TestPipeline.create(); + PCollection<AmazonKinesisMock.TestData> result = p. + apply( + KinesisIO.Read. + from("stream", InitialPositionInStream.TRIM_HORIZON). + using(new AmazonKinesisMock.Provider(testData, 10)). + withMaxNumRecords(noOfShards * noOfEventsPerShard)). + apply(ParDo.of(new KinesisRecordToTestData())); + PAssert.that(result).containsInAnyOrder(Iterables.concat(testData)); + p.run(); + } + + private static class KinesisRecordToTestData extends + DoFn<KinesisRecord, AmazonKinesisMock.TestData> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(new AmazonKinesisMock.TestData(c.element())); + } + } + + private List<List<AmazonKinesisMock.TestData>> provideTestData( + int noOfShards, + int noOfEventsPerShard) { + + int seqNumber = 0; + + List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList(); + for (int i = 0; i < noOfShards; ++i) { + List<AmazonKinesisMock.TestData> shardData = newArrayList(); + shardedData.add(shardData); + + DateTime arrival = DateTime.now(); + for (int j = 0; j < noOfEventsPerShard; ++j) { + arrival = arrival.plusSeconds(1); + + seqNumber++; + shardData.add(new AmazonKinesisMock.TestData( + Integer.toString(seqNumber), + arrival.toInstant(), + Integer.toString(seqNumber)) + ); + } + } + + return shardedData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java new file mode 100644 index 0000000..205f050 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + + +import com.google.common.collect.Iterables; + +import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import static java.util.Arrays.asList; +import java.util.Iterator; +import java.util.List; + +/*** + * + */ +@RunWith(MockitoJUnitRunner.class) +public class KinesisReaderCheckpointTest { + @Mock + private ShardCheckpoint a, b, c; + + private KinesisReaderCheckpoint checkpoint; + + @Before + public void setUp() { + checkpoint = new KinesisReaderCheckpoint(asList(a, b, c)); + } + + @Test + public void splitsCheckpointAccordingly() { + verifySplitInto(1); + verifySplitInto(2); + verifySplitInto(3); + verifySplitInto(4); + } + + @Test(expected = UnsupportedOperationException.class) + public void isImmutable() { + Iterator<ShardCheckpoint> iterator = checkpoint.iterator(); + iterator.remove(); + } + + private void verifySplitInto(int size) { + List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size); + assertThat(Iterables.concat(split)).containsOnly(a, b, c); + assertThat(split).hasSize(Math.min(size, 3)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java new file mode 100644 index 0000000..fbc7c66 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Lists.newArrayList; + +import com.amazonaws.regions.Regions; +import static org.assertj.core.api.Assertions.assertThat; +import org.apache.commons.lang.RandomStringUtils; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Ignore; +import org.junit.Test; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Integration test, that reads from the real Kinesis. + * You need to provide all {@link KinesisTestOptions} in order to run this. + */ +public class KinesisReaderIT { + private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10); + private ExecutorService singleThreadExecutor = newSingleThreadExecutor(); + + + @Ignore + @Test + public void readsDataFromRealKinesisStream() + throws IOException, InterruptedException, ExecutionException { + KinesisTestOptions options = readKinesisOptions(); + List<String> testData = prepareTestData(1000); + + Future<?> future = startTestPipeline(testData, options); + KinesisUploader.uploadAll(testData, options); + future.get(); + } + + private List<String> prepareTestData(int count) { + List<String> data = newArrayList(); + for (int i = 0; i < count; ++i) { + data.add(RandomStringUtils.randomAlphabetic(32)); + } + return data; + } + + private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options) + throws InterruptedException { + final Pipeline p = TestPipeline.create(); + PCollection<String> result = p. + apply(KinesisIO.Read. + from(options.getAwsKinesisStream(), Instant.now()). + using(options.getAwsAccessKey(), options.getAwsSecretKey(), + Regions.fromName(options.getAwsKinesisRegion())). + withMaxReadTime(Duration.standardMinutes(3)) + ). + apply(ParDo.of(new RecordDataToString())); + PAssert.that(result).containsInAnyOrder(testData); + + Future<?> future = singleThreadExecutor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + PipelineResult result = p.run(); + PipelineResult.State state = result.getState(); + while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) { + Thread.sleep(1000); + state = result.getState(); + } + assertThat(state).isEqualTo(PipelineResult.State.DONE); + return null; + } + }); + Thread.sleep(PIPELINE_STARTUP_TIME); + return future; + } + + private KinesisTestOptions readKinesisOptions() { + PipelineOptionsFactory.register(KinesisTestOptions.class); + return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class); + } + + private static class RecordDataToString extends DoFn<KinesisRecord, String> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + checkNotNull(c.element(), "Null record given"); + c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java new file mode 100644 index 0000000..793fb57 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import static java.util.Arrays.asList; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Created by ppastuszka on 12.12.15. + */ +@RunWith(MockitoJUnitRunner.class) +public class KinesisReaderTest { + @Mock + private SimplifiedKinesisClient kinesis; + @Mock + private CheckpointGenerator generator; + @Mock + private ShardCheckpoint firstCheckpoint, secondCheckpoint; + @Mock + private ShardRecordsIterator firstIterator, secondIterator; + @Mock + private KinesisRecord a, b, c, d; + + private KinesisReader reader; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint( + asList(firstCheckpoint, secondCheckpoint) + )); + when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator); + when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator); + when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); + when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent()); + + reader = new KinesisReader(kinesis, generator, null); + } + + @Test + public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException { + assertThat(reader.start()).isFalse(); + } + + @Test(expected = NoSuchElementException.class) + public void throwsNoSuchElementExceptionIfNoData() throws IOException { + reader.start(); + reader.getCurrent(); + } + + @Test + public void startReturnsTrueIfSomeDataAvailable() throws IOException, + TransientKinesisException { + when(firstIterator.next()). + thenReturn(CustomOptional.of(a)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + assertThat(reader.start()).isTrue(); + } + + @Test + public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis() + throws IOException, TransientKinesisException { + reader.start(); + + when(firstIterator.next()).thenThrow(TransientKinesisException.class); + + assertThat(reader.advance()).isFalse(); + } + + @Test + public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException { + when(firstIterator.next()). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(a)). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(b)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + when(secondIterator.next()). + thenReturn(CustomOptional.of(c)). + thenReturn(CustomOptional.<KinesisRecord>absent()). + thenReturn(CustomOptional.of(d)). + thenReturn(CustomOptional.<KinesisRecord>absent()); + + assertThat(reader.start()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(c); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(a); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(d); + assertThat(reader.advance()).isTrue(); + assertThat(reader.getCurrent()).isEqualTo(b); + assertThat(reader.advance()).isFalse(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java new file mode 100644 index 0000000..b09b7eb --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import org.apache.beam.sdk.testing.CoderProperties; + +import org.joda.time.Instant; +import org.junit.Test; +import java.nio.ByteBuffer; + +/** + * Created by p.pastuszka on 20.07.2016. + */ +public class KinesisRecordCoderTest { + @Test + public void encodingAndDecodingWorks() throws Exception { + KinesisRecord record = new KinesisRecord( + ByteBuffer.wrap("data".getBytes()), + "sequence", + 128L, + "partition", + Instant.now(), + Instant.now(), + "stream", + "shard" + ); + CoderProperties.coderDecodeEncodeEqual( + new KinesisRecordCoder(), record + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java new file mode 100644 index 0000000..65a7605 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.testing.TestPipelineOptions; + +/*** + * Options for Kinesis integration tests. + */ +public interface KinesisTestOptions extends TestPipelineOptions { + @Description("AWS region where Kinesis stream resided") + @Default.String("aws-kinesis-region") + String getAwsKinesisRegion(); + void setAwsKinesisRegion(String value); + + @Description("Kinesis stream name") + @Default.String("aws-kinesis-stream") + String getAwsKinesisStream(); + void setAwsKinesisStream(String value); + + @Description("AWS secret key") + @Default.String("aws-secret-key") + String getAwsSecretKey(); + void setAwsSecretKey(String value); + + @Description("AWS access key") + @Default.String("aws-access-key") + String getAwsAccessKey(); + void setAwsAccessKey(String value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java new file mode 100644 index 0000000..0dcede9 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import static com.google.common.collect.Lists.newArrayList; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; +import java.nio.ByteBuffer; +import java.util.List; + +/*** + * Sends records to Kinesis in reliable way. + */ +public class KinesisUploader { + + public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499; + + public static void uploadAll(List<String> data, KinesisTestOptions options) { + AmazonKinesis client = new AmazonKinesisClient( + new StaticCredentialsProvider( + new BasicAWSCredentials( + options.getAwsAccessKey(), options.getAwsSecretKey())) + ).withRegion(Regions.fromName(options.getAwsKinesisRegion())); + + List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH); + + + for (List<String> partition : partitions) { + List<PutRecordsRequestEntry> allRecords = newArrayList(); + for (String row : partition) { + allRecords.add(new PutRecordsRequestEntry(). + withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))). + withPartitionKey(Integer.toString(row.hashCode())) + + ); + } + + PutRecordsResult result; + do { + result = client.putRecords( + new PutRecordsRequest(). + withStreamName(options.getAwsKinesisStream()). + withRecords(allRecords)); + List<PutRecordsRequestEntry> failedRecords = newArrayList(); + int i = 0; + for (PutRecordsResultEntry row : result.getRecords()) { + if (row.getErrorCode() != null) { + failedRecords.add(allRecords.get(i)); + } + ++i; + } + allRecords = failedRecords; + } + + while (result.getFailedRecordCount() > 0); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java new file mode 100644 index 0000000..360106d --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.google.common.collect.Lists; + +import static org.mockito.BDDMockito.given; +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import java.util.Collections; +import java.util.List; + + +/*** + */ +@RunWith(MockitoJUnitRunner.class) +public class RecordFilterTest { + @Mock + private ShardCheckpoint checkpoint; + @Mock + private KinesisRecord record1, record2, record3, record4, record5; + + @Test + public void shouldFilterOutRecordsBeforeOrAtCheckpoint() { + given(checkpoint.isBeforeOrAt(record1)).willReturn(false); + given(checkpoint.isBeforeOrAt(record2)).willReturn(true); + given(checkpoint.isBeforeOrAt(record3)).willReturn(true); + given(checkpoint.isBeforeOrAt(record4)).willReturn(false); + given(checkpoint.isBeforeOrAt(record5)).willReturn(true); + List<KinesisRecord> records = Lists.newArrayList(record1, record2, + record3, record4, record5); + RecordFilter underTest = new RecordFilter(); + + List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); + + Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5); + } + + @Test + public void shouldNotFailOnEmptyList() { + List<KinesisRecord> records = Collections.emptyList(); + RecordFilter underTest = new RecordFilter(); + + List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint); + + Assertions.assertThat(retainedRecords).isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java new file mode 100644 index 0000000..a508ddf --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import static com.google.common.collect.Lists.newArrayList; + +import static org.assertj.core.api.Assertions.assertThat; +import org.junit.Test; +import java.util.Collections; +import java.util.List; + +/** + * Created by ppastuszka on 12.12.15. + */ +public class RoundRobinTest { + @Test(expected = IllegalArgumentException.class) + public void doesNotAllowCreationWithEmptyCollection() { + new RoundRobin<>(Collections.emptyList()); + } + + @Test + public void goesThroughElementsInCycle() { + List<String> input = newArrayList("a", "b", "c"); + + RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input)); + + input.addAll(input); // duplicate the input + for (String element : input) { + assertThat(roundRobin.getCurrent()).isEqualTo(element); + assertThat(roundRobin.getCurrent()).isEqualTo(element); + roundRobin.moveForward(); + } + } + + @Test + public void usualIteratorGoesThroughElementsOnce() { + List<String> input = newArrayList("a", "b", "c"); + + RoundRobin<String> roundRobin = new RoundRobin<>(input); + assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0])); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java new file mode 100644 index 0000000..2227cef --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON; +import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER; +import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER; +import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.joda.time.DateTime; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import java.io.IOException; + +/** + * + */ +@RunWith(MockitoJUnitRunner.class) +public class ShardCheckpointTest { + private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT"; + private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT"; + private static final String STREAM_NAME = "STREAM"; + private static final String SHARD_ID = "SHARD_ID"; + @Mock + private SimplifiedKinesisClient client; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(client.getShardIterator( + eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER), + anyString(), isNull(Instant.class))). + thenReturn(AT_SEQUENCE_SHARD_IT); + when(client.getShardIterator( + eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER), + anyString(), isNull(Instant.class))). + thenReturn(AFTER_SEQUENCE_SHARD_IT); + } + + @Test + public void testProvidingShardIterator() throws IOException, TransientKinesisException { + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) + .isEqualTo(AT_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client)) + .isEqualTo(AFTER_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo + (AT_SEQUENCE_SHARD_IT); + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)) + .isEqualTo(AT_SEQUENCE_SHARD_IT); + } + + @Test + public void testComparisonWithExtendedSequenceNumber() { + assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isTrue(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isFalse(); + + assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("100", 0L)) + )).isFalse(); + + assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( + recordWith(new ExtendedSequenceNumber("99", 1L)) + )).isFalse(); + } + + @Test + public void testComparisonWithTimestamp() { + DateTime referenceTimestamp = DateTime.now(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant())) + ).isFalse(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.toInstant())) + ).isTrue(); + + assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant()) + .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant())) + ).isTrue(); + } + + private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) { + KinesisRecord record = mock(KinesisRecord.class); + given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber); + return record; + } + + private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber, + Long subSequenceNumber) { + return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber, + subSequenceNumber); + } + + private KinesisRecord recordWith(Instant approximateArrivalTimestamp) { + KinesisRecord record = mock(KinesisRecord.class); + given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp); + return record; + } + + private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) { + return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java new file mode 100644 index 0000000..e2a3ccc --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Mockito.when; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import java.io.IOException; +import java.util.Collections; + +/** + * Created by ppastuszka on 12.12.15. + */ +@RunWith(MockitoJUnitRunner.class) +public class ShardRecordsIteratorTest { + private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR"; + private static final String SECOND_ITERATOR = "SECOND_ITERATOR"; + private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR"; + private static final String THIRD_ITERATOR = "THIRD_ITERATOR"; + private static final String STREAM_NAME = "STREAM_NAME"; + private static final String SHARD_ID = "SHARD_ID"; + + @Mock + private SimplifiedKinesisClient kinesisClient; + @Mock + private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint; + @Mock + private GetKinesisRecordsResult firstResult, secondResult, thirdResult; + @Mock + private KinesisRecord a, b, c, d; + @Mock + private RecordFilter recordFilter; + + private ShardRecordsIterator iterator; + + @Before + public void setUp() throws IOException, TransientKinesisException { + when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR); + when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint); + when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint); + when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(aCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint); + when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(bCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint); + when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(cCheckpoint.getShardId()).thenReturn(SHARD_ID); + when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME); + when(dCheckpoint.getShardId()).thenReturn(SHARD_ID); + + when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(firstResult); + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(thirdResult); + + when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR); + when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR); + + when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList()); + + when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint + .class))).thenAnswer(new IdentityAnswer()); + + iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter); + } + + @Test + public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException { + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } + + @Test + public void goesThroughAvailableRecords() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(asList(a, b, c)); + when(secondResult.getRecords()).thenReturn(singletonList(d)); + + assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(c)); + assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(d)); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint); + } + + @Test + public void refreshesExpiredIterator() throws IOException, TransientKinesisException { + when(firstResult.getRecords()).thenReturn(singletonList(a)); + when(secondResult.getRecords()).thenReturn(singletonList(b)); + + when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenThrow(ExpiredIteratorException.class); + when(aCheckpoint.getShardIterator(kinesisClient)) + .thenReturn(SECOND_REFRESHED_ITERATOR); + when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) + .thenReturn(secondResult); + + assertThat(iterator.next()).isEqualTo(CustomOptional.of(a)); + assertThat(iterator.next()).isEqualTo(CustomOptional.of(b)); + assertThat(iterator.next()).isEqualTo(CustomOptional.absent()); + } + + private static class IdentityAnswer implements Answer<Object> { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArguments()[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java new file mode 100644 index 0000000..44d29d6 --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.StreamDescription; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.reset; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import java.util.List; + +/*** + */ +@RunWith(MockitoJUnitRunner.class) +public class SimplifiedKinesisClientTest { + private static final String STREAM = "stream"; + private static final String SHARD_1 = "shard-01"; + private static final String SHARD_2 = "shard-02"; + private static final String SHARD_3 = "shard-03"; + private static final String SHARD_ITERATOR = "iterator"; + private static final String SEQUENCE_NUMBER = "abc123"; + + @Mock + private AmazonKinesis kinesis; + @InjectMocks + private SimplifiedKinesisClient underTest; + + @Test + public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withStartingSequenceNumber(SEQUENCE_NUMBER) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldReturnIteratorStartingWithTimestamp() throws Exception { + Instant timestamp = Instant.now(); + given(kinesis.getShardIterator(new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + .withTimestamp(timestamp.toDate()) + )).willReturn(new GetShardIteratorResult() + .withShardIterator(SHARD_ITERATOR)); + + String stream = underTest.getShardIterator(STREAM, SHARD_1, + ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); + + assertThat(stream).isEqualTo(SHARD_ITERATOR); + } + + @Test + public void shouldHandleExpiredIterationExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForGetShardIterator() { + shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForGetShardIterator() { + shouldHandleGetShardIteratorError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleGetShardIteratorError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + GetShardIteratorRequest request = new GetShardIteratorRequest() + .withStreamName(STREAM) + .withShardId(SHARD_1) + .withShardIteratorType(ShardIteratorType.LATEST); + + given(kinesis.getShardIterator(request)).willThrow(thrownException); + + try { + underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + @Test + public void shouldListAllShards() throws Exception { + Shard shard1 = new Shard().withShardId(SHARD_1); + Shard shard2 = new Shard().withShardId(SHARD_2); + Shard shard3 = new Shard().withShardId(SHARD_3); + given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard1, shard2) + .withHasMoreShards(true))); + given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(shard3) + .withHasMoreShards(false))); + + List<Shard> shards = underTest.listShards(STREAM); + + assertThat(shards).containsOnly(shard1, shard2, shard3); + } + + @Test + public void shouldHandleExpiredIterationExceptionForShardListing() { + shouldHandleShardListingError(new ExpiredIteratorException(""), + ExpiredIteratorException.class); + } + + @Test + public void shouldHandleLimitExceededExceptionForShardListing() { + shouldHandleShardListingError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { + shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForShardListing() { + shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForShardListing() { + shouldHandleShardListingError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleShardListingError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + given(kinesis.describeStream(STREAM, null)).willThrow(thrownException); + try { + underTest.listShards(STREAM); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + + private AmazonServiceException newAmazonServiceException(ErrorType errorType) { + AmazonServiceException exception = new AmazonServiceException(""); + exception.setErrorType(errorType); + return exception; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java new file mode 100644 index 0000000..44dbf4a --- /dev/null +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing from Amazon Kinesis. + */ +package org.apache.beam.sdk.io.kinesis; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 4198499..6cbd615 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -37,6 +37,7 @@ <module>hdfs</module> <module>jms</module> <module>kafka</module> + <module>kinesis</module> </modules> </project>