SAMZA-1515; Implement a consumer for Kinesis Author: Aditya Toomula <atoom...@atoomula-ld1.linkedin.biz>
Reviewers: Jagadish<jagad...@apache.org> Closes #368 from atoomula/kinesis Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9961023f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9961023f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9961023f Branch: refs/heads/master Commit: 9961023f7bf7c4b19804fb4e50a14c86d6fc9233 Parents: 5e68d62 Author: Aditya Toomula <atoom...@atoomula-ld1.linkedin.biz> Authored: Tue Nov 28 13:12:10 2017 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Nov 28 13:12:10 2017 -0800 ---------------------------------------------------------------------- build.gradle | 33 ++ .../kinesis/KinesisAWSCredentialsProvider.java | 69 +++++ .../samza/system/kinesis/KinesisConfig.java | 287 ++++++++++++++++++ .../system/kinesis/KinesisSystemAdmin.java | 124 ++++++++ .../system/kinesis/KinesisSystemFactory.java | 87 ++++++ .../KinesisIncomingMessageEnvelope.java | 62 ++++ .../consumer/KinesisRecordProcessor.java | 208 +++++++++++++ .../KinesisRecordProcessorListener.java | 51 ++++ .../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++ .../consumer/KinesisSystemConsumerOffset.java | 107 +++++++ .../consumer/NoAvailablePartitionException.java | 38 +++ .../system/kinesis/consumer/SSPAllocator.java | 73 +++++ .../metrics/KinesisSystemConsumerMetrics.java | 106 +++++++ .../system/kinesis/metrics/SamzaHistogram.java | 63 ++++ .../TestKinesisAWSCredentialsProvider.java | 60 ++++ .../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++ .../kinesis/TestKinesisSystemFactory.java | 115 +++++++ .../consumer/TestKinesisRecordProcessor.java | 301 +++++++++++++++++++ .../consumer/TestKinesisSystemConsumer.java | 270 +++++++++++++++++ .../TestKinesisSystemConsumerOffset.java | 48 +++ .../kinesis/consumer/TestSSPAllocator.java | 127 ++++++++ settings.gradle | 5 +- 22 files changed, 2619 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 59ff5f2..eddb11c 100644 --- a/build.gradle +++ b/build.gradle @@ -220,6 +220,39 @@ project(':samza-azure') { } } +project(':samza-aws') { + apply plugin: 'java' + apply plugin: 'checkstyle' + + dependencies { + compile "com.amazonaws:aws-java-sdk-kinesis:1.11.152" + compile "com.amazonaws:amazon-kinesis-client:1.7.5" + compile "com.amazonaws:amazon-kinesis-producer:0.10.0" + compile "io.dropwizard.metrics:metrics-core:3.1.2" + compile "org.codehaus.jackson:jackson-core-asl:1.9.7" + compile "org.codehaus.jackson:jackson-mapper-asl:1.9.7" + compile project(':samza-api') + compile project(":samza-core_$scalaVersion") + compile "org.slf4j:slf4j-api:$slf4jVersion" + runtime "org.apache.httpcomponents:httpclient:4.5.2" + runtime "org.apache.httpcomponents:httpcore:4.4.5" + testCompile "junit:junit:$junitVersion" + testCompile "org.mockito:mockito-all:$mockitoVersion" + } + + repositories { + maven { + url "https://repo1.maven.org/maven2/" + } + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + toolVersion = "$checkstyleVersion" + } +} + + project(":samza-autoscaling_$scalaVersion") { apply plugin: 'scala' apply plugin: 'checkstyle' http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java new file mode 100644 index 0000000..a37cfb4 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; + + +/** + * AWSCredentialsProvider implementation that takes in accessKey and secretKey directly. Requires both accessKey and + * secretKey to be non-null for it to create a BasicAWSCredentials instance. Otherwise, it creates an AWSCredentials + * instance with null keys. + */ +public class KinesisAWSCredentialsProvider implements AWSCredentialsProvider { + private final AWSCredentials creds; + private static final Logger LOG = LoggerFactory.getLogger(KinesisAWSCredentialsProvider.class.getName()); + + public KinesisAWSCredentialsProvider(String accessKey, String secretKey) { + if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) { + creds = new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return null; + } + + @Override + public String getAWSSecretKey() { + return null; + } + }; + LOG.info("Could not load credentials from KinesisAWSCredentialsProvider"); + } else { + creds = new BasicAWSCredentials(accessKey, secretKey); + LOG.info("Loaded credentials from KinesisAWSCredentialsProvider"); + } + } + + @Override + public AWSCredentials getCredentials() { + return creds; + } + + @Override + public void refresh() { + //no-op + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java new file mode 100644 index 0000000..a4ac40d --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.ClientConfiguration; + + +/** + * Configs for Kinesis system. It contains three sets of configs: + * <ol> + * <li> Configs required by Samza Kinesis Consumer. + * <li> Configs that are AWS client specific provided at system scope {@link ClientConfiguration} + * <li> Configs that are KCL specific (could be provided either at system scope or stream scope) + * {@link KinesisClientLibConfiguration} + * </ol> + */ +public class KinesisConfig extends MapConfig { + private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName()); + + private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region"; + private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region"; + + private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey"; + private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey"; + + private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig."; + private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost"; + private static final String DEFAULT_CONFIG_PROXY_HOST = ""; + private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort"; + private static final int DEFAULT_CONFIG_PROXY_PORT = 0; + + private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl."; + private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl."; + + public KinesisConfig(Config config) { + super(config); + } + + /** + * Return a set of streams from the config for a given system. + * @param system name of the system + * @return a set of streams + */ + public Set<String> getKinesisStreams(String system) { + // build stream-level configs + Config streamsConfig = subset(String.format("systems.%s.streams.", system), true); + // all properties should now start with stream name + Set<String> streams = new HashSet<>(); + streamsConfig.keySet().forEach(key -> { + String[] parts = key.split("\\.", 2); + if (parts.length != 2) { + throw new IllegalArgumentException("Ill-formatted stream config: " + key); + } + streams.add(parts[0]); + }); + return streams; + } + + /** + * Get KCL config for a given system stream. + * @param system name of the system + * @param stream name of the stream + * @param appName name of the application + * @return Stream scoped KCL configs required to build + * {@link KinesisClientLibConfiguration} + */ + public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) { + ClientConfiguration clientConfig = getAWSClientConfig(system); + String workerId = appName + "-" + UUID.randomUUID(); + InitialPositionInStream startPos = InitialPositionInStream.LATEST; + AWSCredentialsProvider provider = credentialsProviderForStream(system, stream); + KinesisClientLibConfiguration kinesisClientLibConfiguration = + new KinesisClientLibConfiguration(appName, stream, provider, workerId) + .withRegionName(getRegion(system, stream).getName()) + .withKinesisClientConfig(clientConfig) + .withCloudWatchClientConfig(clientConfig) + .withDynamoDBClientConfig(clientConfig) + .withInitialPositionInStream(startPos) + .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics. + // First, get system scoped configs for KCL and override with configs set at stream scope. + setKinesisClientLibConfigs( + subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration); + setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)), + kinesisClientLibConfiguration); + return kinesisClientLibConfiguration; + } + + /** + * Get the Kinesis secret key for the system stream + * @param system name of the system + * @param stream name of the stream + * @return Kinesis secret key + */ + protected String getStreamSecretKey(String system, String stream) { + return get(String.format(CONFIG_STREAM_SECRET_KEY, system, stream)); + } + + /** + * Get SSL socket factory for the proxy for a given system + * @param system name of the system + * @return ConnectionSocketFactory + */ + protected ConnectionSocketFactory getSSLSocketFactory(String system) { + return null; + } + + /** + * @param system name of the system + * @return {@link ClientConfiguration} which has options controlling how the client connects to kinesis + * (eg: proxy settings, retry counts, etc) + */ + ClientConfiguration getAWSClientConfig(String system) { + ClientConfiguration awsClientConfig = new ClientConfiguration(); + setAwsClientConfigs(subset(String.format(CONFIG_AWS_CLIENT_CONFIG, system)), awsClientConfig); + awsClientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLSocketFactory(system)); + return awsClientConfig; + } + + /** + * Get the proxy host as a system level config. This is needed when + * users need to go through a proxy for the Kinesis connections. + * @param system name of the system + * @return proxy host name or empty string if not defined + */ + String getProxyHost(String system) { + return get(String.format(CONFIG_PROXY_HOST, system), DEFAULT_CONFIG_PROXY_HOST); + } + + /** + * Get the proxy port number as a system level config. This is needed when + * users need to go through a proxy for the Kinesis connections. + * @param system name of the system + * @return proxy port number or 0 if not defined + */ + int getProxyPort(String system) { + return getInt(String.format(CONFIG_PROXY_PORT, system), DEFAULT_CONFIG_PROXY_PORT); + } + + /** + * Get the Kinesis region for the system stream + * @param system name of the system + * @param stream name of the stream + * @return Kinesis region + */ + Region getRegion(String system, String stream) { + String name = get(String.format(CONFIG_STREAM_REGION, system, stream), + get(String.format(CONFIG_SYSTEM_REGION, system))); + return Region.getRegion(Regions.fromName(name)); + } + + /** + * Get the Kinesis access key name for the system stream + * @param system name of the system + * @param stream name of the stream + * @return Kinesis access key + */ + String getStreamAccessKey(String system, String stream) { + return get(String.format(CONFIG_STREAM_ACCESS_KEY, system, stream)); + } + + /** + * Get the appropriate CredentialProvider for a given system stream. + * @param system name of the system + * @param stream name of the stream + * @return AWSCredentialsProvider + */ + AWSCredentialsProvider credentialsProviderForStream(String system, String stream) { + // Try to load credentials in the following order: + // 1. Access key from the config and passed in secretKey + // 2. From the default credential provider chain (environment variables, system properties, AWS profile file, etc) + return new AWSCredentialsProviderChain( + new KinesisAWSCredentialsProvider(getStreamAccessKey(system, stream), getStreamSecretKey(system, stream)), + new DefaultAWSCredentialsProviderChain()); + } + + private void setAwsClientConfigs(Config config, ClientConfiguration clientConfig) { + for (Entry<String, String> entry : config.entrySet()) { + boolean found = false; + String key = entry.getKey(); + String value = entry.getValue(); + if (StringUtils.isEmpty(value)) { + continue; + } + for (Method method : ClientConfiguration.class.getMethods()) { + // For each property invoke the corresponding setter, if it exists + if (method.getName().equals("set" + key)) { + found = true; + Class<?> type = method.getParameterTypes()[0]; + try { + if (type == long.class) { + method.invoke(clientConfig, Long.valueOf(value)); + } else if (type == int.class) { + method.invoke(clientConfig, Integer.valueOf(value)); + } else if (type == boolean.class) { + method.invoke(clientConfig, Boolean.valueOf(value)); + } else if (type == String.class) { + method.invoke(clientConfig, value); + } + LOG.info("Loaded property " + key + " = " + value); + break; + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Error trying to set field %s with the value '%s'", key, value), e); + } + } + } + if (!found) { + LOG.warn("Property " + key + " ignored as there is no corresponding set method"); + } + } + } + + private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) { + for (Entry<String, String> entry : config.entrySet()) { + boolean found = false; + String key = entry.getKey(); + String value = entry.getValue(); + if (StringUtils.isEmpty(value)) { + continue; + } + for (Method method : KinesisClientLibConfiguration.class.getMethods()) { + if (method.getName().equals("with" + key)) { + found = true; + Class<?> type = method.getParameterTypes()[0]; + try { + if (type == long.class) { + method.invoke(kinesisLibConfig, Long.valueOf(value)); + } else if (type == int.class) { + method.invoke(kinesisLibConfig, Integer.valueOf(value)); + } else if (type == boolean.class) { + method.invoke(kinesisLibConfig, Boolean.valueOf(value)); + } else if (type == String.class) { + method.invoke(kinesisLibConfig, value); + } else if (type == InitialPositionInStream.class) { + method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase())); + } + LOG.info("Loaded property " + key + " = " + value); + break; + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Error trying to set field %s with the value '%s'", key, value), e); + } + } + } + if (!found) { + LOG.warn("Property " + key + " ignored as there is no corresponding set method"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java new file mode 100644 index 0000000..4843276 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.StreamDescription; + + +/** + * A Kinesis-based implementation of SystemAdmin. + */ +public class KinesisSystemAdmin implements SystemAdmin { + + private static final SystemStreamMetadata.SystemStreamPartitionMetadata SYSTEM_STREAM_PARTITION_METADATA = + new SystemStreamMetadata.SystemStreamPartitionMetadata(ExtendedSequenceNumber.TRIM_HORIZON.getSequenceNumber(), + ExtendedSequenceNumber.LATEST.getSequenceNumber(), + ExtendedSequenceNumber.LATEST.getSequenceNumber()); + + private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemAdmin.class.getName()); + + private final String system; + private final KinesisConfig kConfig; + + public KinesisSystemAdmin(String system, KinesisConfig kConfig) { + this.system = system; + this.kConfig = kConfig; + } + + /** + * Source of truth for checkpointing is always kinesis and the offsets written to samza checkpoint topic are ignored. + * Hence, return null for the getOffsetsAfter for a supplied map of ssps. + */ + @Override + public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { + Map<SystemStreamPartition, String> offsetsAfter = new HashMap<>(); + + for (SystemStreamPartition systemStreamPartition : offsets.keySet()) { + offsetsAfter.put(systemStreamPartition, null); + } + + return offsetsAfter; + } + + /** + * Source of truth for checkpointing is always kinesis and the offsets given by samza are always ignored by KCL. + * Hence, return a placeholder for each ssp. + */ + @Override + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { + return streamNames.stream().collect(Collectors.toMap(Function.identity(), this::createSystemStreamMetadata)); + } + + private SystemStreamMetadata createSystemStreamMetadata(String stream) { + LOG.info("create stream metadata for stream {} based on aws stream", stream); + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadata = new HashMap<>(); + AmazonKinesisClient client = null; + + try { + ClientConfiguration clientConfig = kConfig.getAWSClientConfig(system); + AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() + .withCredentials(kConfig.credentialsProviderForStream(system, stream)) + .withClientConfiguration(clientConfig); + builder.setRegion(kConfig.getRegion(system, stream).getName()); + client = (AmazonKinesisClient) builder.build(); + StreamDescription desc = client.describeStream(stream).getStreamDescription(); + IntStream.range(0, desc.getShards().size()) + .forEach(i -> metadata.put(new Partition(i), SYSTEM_STREAM_PARTITION_METADATA)); + } catch (Exception e) { + String errMsg = "couldn't load metadata for stream " + stream; + LOG.error(errMsg, e); + throw new SamzaException(errMsg, e); + } finally { + if (client != null) { + client.shutdown(); + } + } + + return new SystemStreamMetadata(stream, metadata); + } + + /** + * Checkpoints are written to KCL and is always the source of truth. Format for Samza offsets is different from + * that of Kinesis checkpoint. Samza offsets are not comparable. Hence, return null. + */ + @Override + public Integer offsetComparator(String offset1, String offset2) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java new file mode 100644 index 0000000..558e871 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.config.TaskConfigJava; +import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStream; + +import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer; + + +/** + * A Kinesis-based implementation of SystemFactory. + */ +public class KinesisSystemFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String system, Config config, MetricsRegistry registry) { + KinesisConfig kConfig = new KinesisConfig(config); + return new KinesisSystemConsumer(system, kConfig, registry); + } + + @Override + public SystemProducer getProducer(String system, Config config, MetricsRegistry registry) { + return null; + } + + @Override + public SystemAdmin getAdmin(String system, Config config) { + validateConfig(system, config); + KinesisConfig kConfig = new KinesisConfig(config); + return new KinesisSystemAdmin(system, kConfig); + } + + protected void validateConfig(String system, Config config) { + // Kinesis system does not support groupers other than AllSspToSingleTaskGrouper + JobConfig jobConfig = new JobConfig(config); + if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals( + AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) { + String errMsg = String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config" + + " to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), system, + JobConfig.SSP_GROUPER_FACTORY(), AllSspToSingleTaskGrouperFactory.class.getCanonicalName()); + throw new ConfigException(errMsg); + } + + // Kinesis streams cannot be configured as broadcast streams + TaskConfigJava taskConfig = new TaskConfigJava(config); + if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> system.equals(ss.getSystem()))) { + throw new ConfigException("Kinesis streams cannot be configured as broadcast streams."); + } + + // Kinesis streams cannot be configured as bootstrap streams + KinesisConfig kConfig = new KinesisConfig(config); + kConfig.getKinesisStreams(system).forEach(stream -> { + StreamConfig streamConfig = new StreamConfig(kConfig); + SystemStream ss = new SystemStream(system, stream); + if (streamConfig.getBootstrapEnabled(ss)) { + throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams."); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java new file mode 100644 index 0000000..95e6b6a --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.util.Date; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * Kinesis record with payload and some metadata. + */ +public class KinesisIncomingMessageEnvelope extends IncomingMessageEnvelope { + private final String shardId; + private final String sequenceNumber; + private final Date approximateArrivalTimestamp; + + public KinesisIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, + Object message, String shardId, String sequenceNumber, Date approximateArrivalTimestamp) { + super(systemStreamPartition, offset, key, message); + this.shardId = shardId; + this.sequenceNumber = sequenceNumber; + this.approximateArrivalTimestamp = approximateArrivalTimestamp; + } + + public String getShardId() { + return shardId; + } + + public String getSequenceNumber() { + return sequenceNumber; + } + + public Date getApproximateArrivalTimestamp() { + return approximateArrivalTimestamp; + } + + @Override + public String toString() { + return "KinesisIncomingMessageEnvelope:: shardId:" + shardId + ", sequenceNumber:" + sequenceNumber + + ", approximateArrivalTimestamp:" + approximateArrivalTimestamp + ", message:" + getMessage(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java new file mode 100644 index 0000000..53ff27f --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.util.List; + +import org.apache.commons.lang.Validate; +import org.apache.samza.SamzaException; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.model.Record; + + +/** + * Record processor for AWS kinesis stream. It does the following: + * <ul> + * <li> when a shard is assigned by KCL in initialize API, it asks and gets an ssp from sspAllocator. + * <li> when records are received in processRecords API, it translates them to IncomingMessageEnvelope and enqueues + * the resulting envelope in the appropriate blocking buffer queue. + * <li> when checkpoint API is called by samza, it checkpoints via KCL to Kinesis. + * <li> when shutdown API is called by KCL, based on the terminate reason, it takes necessary action. + * </ul> + * + * initialize, processRecords and shutdown APIs are never called concurrently on a processor instance. However, + * checkpoint API could be called by Samza thread while processRecords and shutdown callback APIs are invoked by KCL. + * Please note that the APIs for different record processor instances could be called concurrently. + */ + +public class KinesisRecordProcessor implements IRecordProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName()); + static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000; + + private final SystemStreamPartition ssp; + + private String shardId; + private KinesisRecordProcessorListener listener; + private IRecordProcessorCheckpointer checkpointer; + private ExtendedSequenceNumber initSeqNumber; + + private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber; + private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber; + + private boolean shutdownRequested = false; + + KinesisRecordProcessor(SystemStreamPartition ssp, KinesisRecordProcessorListener listener) { + this.ssp = ssp; + this.listener = listener; + } + + /** + * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance + * (via processRecords). + * + * @param initializationInput Provides information related to initialization + */ + @Override + public void initialize(InitializationInput initializationInput) { + Validate.isTrue(listener != null, "There is no listener set for the processor."); + initSeqNumber = initializationInput.getExtendedSequenceNumber(); + shardId = initializationInput.getShardId(); + LOG.info("Initialization done for {} with sequence {}", this, + initializationInput.getExtendedSequenceNumber().getSequenceNumber()); + } + + /** + * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the + * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint + * position for each partition key. + * + * @param processRecordsInput Provides the records to be processed as well as information and capabilities related + * to them (eg checkpointing). + */ + @Override + public void processRecords(ProcessRecordsInput processRecordsInput) { + // KCL does not send any records to the processor that was shutdown. + Validate.isTrue(!shutdownRequested, + String.format("KCL returned records after shutdown is called on the processor %s.", this)); + // KCL aways gives reference to the same checkpointer instance for a given processor instance. + checkpointer = processRecordsInput.getCheckpointer(); + List<Record> records = processRecordsInput.getRecords(); + // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true. + if (!records.isEmpty()) { + lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber()); + listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest()); + } + } + + /** + * Invoked by the Samza thread to commit checkpoint for the shard owned by the record processor instance. + * + * @param seqNumber sequenceNumber to checkpoint for the shard owned by this processor instance. + */ + public void checkpoint(String seqNumber) { + ExtendedSequenceNumber seqNumberToCheckpoint = new ExtendedSequenceNumber(seqNumber); + if (initSeqNumber.compareTo(seqNumberToCheckpoint) > 0) { + LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!", + seqNumber, initSeqNumber, this); + return; + } + + if (checkpointer == null) { + // checkpointer could be null as a result of shard re-assignment before the first record is processed. + LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, seqNumber); + return; + } + + try { + checkpointer.checkpoint(seqNumber); + lastCheckpointedRecordSeqNumber = seqNumberToCheckpoint; + } catch (ShutdownException e) { + // This can happen as a result of shard re-assignment. + String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.", + this, seqNumber); + LOG.warn(msg, e); + } catch (InvalidStateException e) { + // This can happen when KCL encounters issues with internal state, eg: dynamoDB table is not found + String msg = + String.format("Checkpointing %s with seqNumber %s failed with exception.", this, seqNumber); + LOG.error(msg, e); + throw new SamzaException(msg, e); + } catch (ThrottlingException e) { + // Throttling is handled by KCL via the client lib configuration properties. If we get an exception inspite of + // throttling back-off behavior, let's throw an exception as the configs + String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is" + + " too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored." + + " Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this, + seqNumber); + throw new SamzaException(msg); + } + } + + /** + * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this + * RecordProcessor instance. + * + * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record + * processor. + */ + @Override + public void shutdown(ShutdownInput shutdownInput) { + LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason()); + + Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this)); + shutdownRequested = true; + // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the + // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the + // progress. + if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { + // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing + // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL + // to consume from the child shard(s). + try { + LOG.info("Waiting for all the records for {} to be processed.", this); + // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case + // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will + // be null. + while (lastProcessedRecordSeqNumber != null + && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) { + Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS); + } + LOG.info("Final checkpoint for {} before shutting down.", this); + shutdownInput.getCheckpointer().checkpoint(); + } catch (Exception e) { + LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e); + } + } + listener.onShutdown(ssp); + } + + String getShardId() { + return shardId; + } + + @Override + public String toString() { + return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", ssp, shardId, hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java new file mode 100644 index 0000000..72d86b9 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.util.List; + +import org.apache.samza.system.SystemStreamPartition; + +import com.amazonaws.services.kinesis.model.Record; + + +/** + * Listener interface implemented by consumer to be notified when {@link KinesisRecordProcessor} receives records and + * is ready to shutdown. + */ +public interface KinesisRecordProcessorListener { + /** + * Method invoked by + * {@link KinesisRecordProcessor#processRecords(com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput)} + * when the records are received by the processor. + * @param ssp Samza partition for which the records belong to + * @param records List of kinesis records + * @param millisBehindLatest Time lag of the batch of records with respect to the tip of the stream + */ + void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest); + + /** + * Method invoked by + * {@link KinesisRecordProcessor#shutdown(com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput)} + * when the processor is ready to shutdown. + * @param ssp Samza partition for which the shutdown is invoked + */ + void onShutdown(SystemStreamPartition ssp); +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java new file mode 100644 index 0000000..6afffd3 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; + +import org.apache.commons.lang.Validate; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.CheckpointListener; +import org.apache.samza.config.JobConfig; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.kinesis.KinesisConfig; +import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics; +import org.apache.samza.util.BlockingEnvelopeMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.model.Record; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + +/** + * The system consumer for Kinesis, extending the {@link BlockingEnvelopeMap}. + * + * The system consumer creates a KinesisWorker per stream in it's own thread by providing a RecordProcessorFactory. + * Kinesis Client Library (KCL) uses this factory to instantiate a KinesisRecordProcessor for each shard in the Kinesis + * stream. KCL pushes data records to the appropriate record processor and the processor is responsible for processing + * the resulting records and place them into a blocking queue in {@link BlockingEnvelopeMap}. + * + * <pre> + * {@code + * Shard1 +----------------------+ + * . --------------------> |KinesisRecordProcessor| + * Stream1 | Shard2 +----------------------+ + * +-------------+ +-----------------------------+ +----------------------+ + * .--------------->| Worker |---->| RecordProcessorFactory | ----> |KinesisRecordProcessor| + * | +-------------+ +-------------+---------------+ +----------------------+ + * | | Shard3 +----------------------+ + * | . --------------------> |KinesisRecordProcessor| + * | +----------------------+ + * | Stream2 + * +---------------------+ +-------------+ +-----------------------------+ +-------+ + * |KinesisSystemConsumer|---->| Worker |---->| RecordProcessorFactory |------->| ... | + * +---------------------+ +-------------+ +-----------------------------+ +-------+ + * | + * | + * | + * | + * | +-----------+ + * . -------------->| ... | + * +-----------+ + * } + * </pre> + * Since KinesisSystemConsumer uses KCL, the checkpoint state is stored in a dynamoDB table which is maintained by KCL. + * KinesisSystemConsumer implements CheckpointListener to commit checkpoints via KCL. + */ + +public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener { + + private static final int MAX_BLOCKING_QUEUE_SIZE = 100; + private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName()); + + private final String system; + private final KinesisConfig kConfig; + private final KinesisSystemConsumerMetrics metrics; + private final SSPAllocator sspAllocator; + + private final Set<String> streams = new HashSet<>(); + private final Map<SystemStreamPartition, KinesisRecordProcessor> processors = new ConcurrentHashMap<>(); + private final List<Worker> workers = new LinkedList<>(); + + private ExecutorService executorService; + + private volatile Exception callbackException; + + public KinesisSystemConsumer(String systemName, KinesisConfig kConfig, MetricsRegistry registry) { + super(registry, System::currentTimeMillis, null); + this.system = systemName; + this.kConfig = kConfig; + this.metrics = new KinesisSystemConsumerMetrics(registry); + this.sspAllocator = new SSPAllocator(); + } + + @Override + protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() { + return new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE); + } + + @Override + protected void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) { + try { + super.put(ssp, envelope); + } catch (Exception e) { + LOG.error("Exception while putting record. Shutting down SystemStream {}", ssp.getSystemStream(), e); + Thread.currentThread().interrupt(); + } + } + + @Override + public void register(SystemStreamPartition ssp, String offset) { + LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", ssp, offset); + String stream = ssp.getStream(); + streams.add(stream); + sspAllocator.free(ssp); + super.register(ssp, offset); + } + + @Override + public void start() { + LOG.info("Start samza consumer for system {}.", system); + + metrics.initializeMetrics(streams); + + ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("kinesis-worker-thread-" + system + "-%d") + .build(); + // launch kinesis workers in separate threads, one per stream + executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory); + + for (String stream : streams) { + // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the + // application name. Dynamodb table name must be unique for a given account and region (even across different + // streams). So, let's create the default one with the combination of job name, job id and stream name. The table + // name could be changed by providing a different TableName via KCL specific config. + String kinesisApplicationName = + kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream; + + Worker worker = new Worker.Builder() + .recordProcessorFactory(createRecordProcessorFactory(stream)) + .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName)) + .build(); + + workers.add(worker); + + // launch kinesis workers in separate thread-pools, one per stream + executorService.execute(worker); + LOG.info("Started worker for system {} stream {}.", system, stream); + } + } + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException { + if (callbackException != null) { + throw new SamzaException(callbackException); + } + return super.poll(ssps, timeout); + } + + @Override + public void stop() { + LOG.info("Stop samza consumer for system {}.", system); + workers.forEach(Worker::shutdown); + workers.clear(); + executorService.shutdownNow(); + LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system); + } + + // package-private for tests + IRecordProcessorFactory createRecordProcessorFactory(String stream) { + return () -> { + // This code is executed in Kinesis thread context. + try { + SystemStreamPartition ssp = sspAllocator.allocate(stream); + KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this); + KinesisRecordProcessor prevProcessor = processors.put(ssp, processor); + Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the" + + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp)); + return processor; + } catch (Exception e) { + callbackException = e; + // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps. + // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart + // will be required at this point. After the job restart, the newly created shards will be discovered and enough + // ssps will be added to sspAllocator freePool. + throw new SamzaException(e); + } + }; + } + + @Override + public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) { + LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets); + sspOffsets.forEach((ssp, offset) -> { + KinesisRecordProcessor processor = processors.get(ssp); + KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset); + if (processor == null) { + LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the" + + " checkpoint {}.", ssp, offset); + } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) { + LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could" + + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(), + kinesisOffset.getShardId(), offset); + } else { + processor.checkpoint(kinesisOffset.getSeqNumber()); + } + }); + } + + @Override + public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) { + metrics.updateMillisBehindLatest(ssp.getStream(), millisBehindLatest); + records.forEach(record -> put(ssp, translate(ssp, record))); + } + + @Override + public void onShutdown(SystemStreamPartition ssp) { + processors.remove(ssp); + sspAllocator.free(ssp); + } + + private IncomingMessageEnvelope translate(SystemStreamPartition ssp, Record record) { + String shardId = processors.get(ssp).getShardId(); + byte[] payload = new byte[record.getData().remaining()]; + + metrics.updateMetrics(ssp.getStream(), record); + record.getData().get(payload); + KinesisSystemConsumerOffset offset = new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber()); + return new KinesisIncomingMessageEnvelope(ssp, offset.toString(), record.getPartitionKey(), + payload, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp()); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java new file mode 100644 index 0000000..13296ca --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import org.apache.samza.SamzaException; +import org.apache.samza.serializers.JsonSerdeV2; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * Kinesis system consumer related checkpoint information that is stored in the IncomingMessageEnvelope offset. + * + * It contains the following metadata: + * <ul> + * <li> shardId: Kinesis stream shardId. + * <li> seqNumber: sequence number in the above shard. + * </ul> + * + * Please note that the source of truth for checkpointing is the AWS dynamoDB table corresponding to the application. + * The offset that is stored in Samza checkpoint topic is not used. + */ +public class KinesisSystemConsumerOffset { + + @JsonProperty("shardId") + private String shardId; + @JsonProperty("seqNumber") + private String seqNumber; + + @JsonCreator + KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId, + @JsonProperty("seqNumber") String seqNumber) { + this.shardId = shardId; + this.seqNumber = seqNumber; + } + + String getShardId() { + return shardId; + } + + String getSeqNumber() { + return seqNumber; + } + + static KinesisSystemConsumerOffset parse(String metadata) { + JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class); + byte[] bytes; + try { + bytes = metadata.getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException(e); + } + return jsonSerde.fromBytes(bytes); + } + + @SuppressWarnings("unchecked") + @Override + public String toString() { + JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class); + return new String(jsonSerde.toBytes(this)); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof KinesisSystemConsumerOffset)) { + return false; + } + + String thatShardId = ((KinesisSystemConsumerOffset) o).getShardId(); + if (!(shardId == null ? thatShardId == null : shardId.equals(thatShardId))) { + return false; + } + String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber(); + if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = shardId.hashCode(); + result = 31 * result + seqNumber.hashCode(); + return result; + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java new file mode 100644 index 0000000..6caf760 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + + +/** + * SSPAllocator is unable to allocate an SSP + */ +public class NoAvailablePartitionException extends Exception { + + private static final long serialVersionUID = 1L; + + public NoAvailablePartitionException(String message) { + super(message); + } + + public NoAvailablePartitionException(String message, Exception e) { + super(message, e); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java new file mode 100644 index 0000000..4b7cff8 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.consumer; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.Validate; +import org.apache.samza.system.SystemStreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SSPAllocator is responsible for assigning Samza SystemStreamPartitions (SSPs). It provides two APIs: + * <ul> + * <li> allocate: Given a stream, returns free ssp. + * <li> free: Adds ssp back to the free pool. + * </ul> + * A free (unallocated) ssp is returned for every allocate request and when there is no available ssp to allocate, + * the allocator throws NoAvailablePartitionException. Allocator could run out of free ssps as a result of dynamic + * shard splits. + */ +class SSPAllocator { + private static final Logger LOG = LoggerFactory.getLogger(SSPAllocator.class.getName()); + + private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>(); + + synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException { + Validate.isTrue(availableSsps.get(stream) != null, + String.format("availableSsps is null for stream %s", stream)); + + if (availableSsps.get(stream).size() <= 0) { + // Set a flag in system consumer so that it could throw an exception in the subsequent poll. + throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially" + + " registered. Could be the result of dynamic resharding.", stream)); + } + + SystemStreamPartition ssp = availableSsps.get(stream).iterator().next(); + availableSsps.get(stream).remove(ssp); + + LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(), + availableSsps.get(ssp.getStream()).size()); + return ssp; + } + + synchronized void free(SystemStreamPartition ssp) { + boolean success = availableSsps.computeIfAbsent(ssp.getStream(), p -> new HashSet<>()).add(ssp); + Validate.isTrue(success, String.format("Ssp %s is already in free pool.", ssp)); + + LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(), + availableSsps.get(ssp.getStream()).size()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java new file mode 100644 index 0000000..2f42981 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.metrics; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; + +import com.amazonaws.services.kinesis.model.Record; + + +/** + * KinesisSystemConsumerMetrics class has per-stream metrics and aggregate metrics across kinesis consumers + */ + +public class KinesisSystemConsumerMetrics { + + private final MetricsRegistry registry; + + // Aggregate metrics across all kinesis system consumers + private static Counter aggEventReadRate = null; + private static Counter aggEventByteReadRate = null; + private static SamzaHistogram aggReadLatency = null; + private static SamzaHistogram aggMillisBehindLatest = null; + + // Per-stream metrics + private Map<String, Counter> eventReadRates; + private Map<String, Counter> eventByteReadRates; + private Map<String, SamzaHistogram> readLatencies; + private Map<String, SamzaHistogram> millisBehindLatest; + + private static final Object LOCK = new Object(); + + private static final String AGGREGATE = "aggregate"; + private static final String EVENT_READ_RATE = "eventReadRate"; + private static final String EVENT_BYTE_READ_RATE = "eventByteReadRate"; + private static final String READ_LATENCY = "readLatency"; + private static final String MILLIS_BEHIND_LATEST = "millisBehindLatest"; + + public KinesisSystemConsumerMetrics(MetricsRegistry registry) { + this.registry = registry; + } + + public void initializeMetrics(Set<String> streamNames) { + eventReadRates = streamNames.stream() + .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE))); + eventByteReadRates = streamNames.stream() + .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE))); + readLatencies = streamNames.stream() + .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY))); + millisBehindLatest = streamNames.stream() + .collect(Collectors.toConcurrentMap(Function.identity(), + x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST))); + + // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. + synchronized (LOCK) { + if (aggEventReadRate == null) { + aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE); + aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE); + aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY); + aggMillisBehindLatest = new SamzaHistogram(registry, AGGREGATE, MILLIS_BEHIND_LATEST); + } + } + } + + public void updateMillisBehindLatest(String stream, Long millisBehindLatest) { + this.millisBehindLatest.get(stream).update(millisBehindLatest); + aggMillisBehindLatest.update(millisBehindLatest); + } + + public void updateMetrics(String stream, Record record) { + eventReadRates.get(stream).inc(); + aggEventReadRate.inc(); + + long recordSize = record.getData().array().length + record.getPartitionKey().length(); + eventByteReadRates.get(stream).inc(recordSize); + aggEventByteReadRate.inc(recordSize); + + long latencyMs = Duration.between(Instant.now(), record.getApproximateArrivalTimestamp().toInstant()).toMillis(); + readLatencies.get(stream).update(latencyMs); + aggReadLatency.update(latencyMs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java new file mode 100644 index 0000000..29964dc --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis.metrics; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; + +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Snapshot; + + +class SamzaHistogram { + + private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D); + private final MetricsRegistry registry; + private final Histogram histogram; + private final List<Double> percentiles; + private final Map<Double, Gauge<Double>> gauges; + + SamzaHistogram(MetricsRegistry registry, String group, String name) { + this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES); + } + + SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) { + this.registry = registry; + this.histogram = new Histogram(new ExponentiallyDecayingReservoir()); + this.percentiles = percentiles; + this.gauges = percentiles.stream() + .filter(x -> x > 0 && x <= 100) + .collect( + Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D))); + } + + synchronized void update(long value) { + histogram.update(value); + Snapshot values = histogram.getSnapshot(); + percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100))); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java new file mode 100644 index 0000000..93887ed --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestKinesisAWSCredentialsProvider { + + @Test + public void testCredentialsProviderWithNonNullKeys() { + String accessKey = "accessKey"; + String secretKey = "secretKey"; + KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, secretKey); + assertEquals(credProvider.getCredentials().getAWSAccessKeyId(), accessKey); + assertEquals(credProvider.getCredentials().getAWSSecretKey(), secretKey); + } + + @Test + public void testCredentialsProviderWithNullAccessKey() { + String secretKey = "secretKey"; + KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, secretKey); + assertNull(credProvider.getCredentials().getAWSAccessKeyId()); + assertNull(credProvider.getCredentials().getAWSSecretKey()); + } + + @Test + public void testCredentialsProviderWithNullSecretKey() { + String accessKey = "accessKey"; + KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, null); + assertNull(credProvider.getCredentials().getAWSAccessKeyId()); + assertNull(credProvider.getCredentials().getAWSSecretKey()); + } + + @Test + public void testCredentialsProviderWithNullKeys() { + KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, null); + assertNull(credProvider.getCredentials().getAWSAccessKeyId()); + assertNull(credProvider.getCredentials().getAWSSecretKey()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java new file mode 100644 index 0000000..56e4810 --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kinesis; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.junit.Test; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; + +import static org.junit.Assert.*; + + +public class TestKinesisConfig { + @Test + public void testGetKinesisStreams() { + Map<String, String> kv = new HashMap<>(); + kv.put("systems.kinesis.streams.kinesis-stream1.prop1", "value1"); + kv.put("systems.kinesis.streams.kinesis-stream1.prop2", "value2"); + kv.put("systems.kinesis.streams.kinesis-stream2.prop1", "value3"); + + Config config = new MapConfig(kv); + KinesisConfig kConfig = new KinesisConfig(config); + + Set<String> streams = kConfig.getKinesisStreams("kinesis"); + assertEquals(2, streams.size()); + } + + @Test + public void testKinesisConfigs() { + Map<String, String> kv = new HashMap<>(); + String system = "kinesis"; + String stream = "kinesis-stream"; + String systemConfigPrefix = String.format("systems.%s.", system); + String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream); + + kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey"); + kv.put(systemConfigPrefix + "aws.region", "us-east-1"); + kv.put(ssConfigPrefix + "aws.accessKey", "accessKey"); + + Config config = new MapConfig(kv); + KinesisConfig kConfig = new KinesisConfig(config); + + assertEquals("us-east-1", kConfig.getRegion(system, stream).getName()); + assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream)); + assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream)); + } + + @Test + public void testAwsClientConfigs() { + Map<String, String> kv = new HashMap<>(); + String system = "kinesis"; + String systemConfigPrefix = String.format("systems.%s.", system); + + // Aws Client Configs + kv.put(systemConfigPrefix + "aws.clientConfig.ProxyHost", "hostName"); + kv.put(systemConfigPrefix + "aws.clientConfig.ProxyPort", "8080"); + + Config config = new MapConfig(kv); + KinesisConfig kConfig = new KinesisConfig(config); + + assertEquals("hostName", kConfig.getAWSClientConfig(system).getProxyHost()); + assertEquals(8080, kConfig.getAWSClientConfig(system).getProxyPort()); + } + + @Test + public void testKclConfigs() { + Map<String, String> kv = new HashMap<>(); + String system = "kinesis"; + String stream = "kinesis-stream"; + String systemConfigPrefix = String.format("systems.%s.", system); + + // region config is required for setting kcl config. + kv.put(systemConfigPrefix + "aws.region", "us-east-1"); + + // Kcl Configs + kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table"); + kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100"); + kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true"); + kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON"); + // override one of the Kcl configs for kinesis-stream1 + kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST"); + + Config config = new MapConfig(kv); + KinesisConfig kConfig = new KinesisConfig(config); + KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app"); + + assertEquals("sample-table", kclConfig.getTableName()); + assertEquals(100, kclConfig.getMaxRecords()); + assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList()); + assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream()); + + // verify if the overriden config is applied for kinesis-stream1 + kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app"); + assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream()); + } + + @Test + public void testgetKCLConfigWithUnknownConfigs() { + Map<String, String> kv = new HashMap<>(); + kv.put("systems.kinesis.aws.region", "us-east-1"); + kv.put("systems.kinesis.streams.kinesis-stream.aws.kcl.random", "value"); + + Config config = new MapConfig(kv); + KinesisConfig kConfig = new KinesisConfig(config); + + // Should not throw any exception and just ignore the unknown configs. + kConfig.getKinesisClientLibConfig("kinesis", "kinesis-stream", "sample-app"); + } +}