Repository: flink Updated Branches: refs/heads/master c11f11359 -> 8d6961bd9
[FLINK-9188] [kinesis] Generic mechanism to set ClientConfiguration properties. This closes #5889. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbcadbe6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbcadbe6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbcadbe6 Branch: refs/heads/master Commit: fbcadbe6a696041aa58ccfa80b7fbd3c70f203a4 Parents: c11f113 Author: Thomas Weise <[email protected]> Authored: Mon Apr 16 22:01:52 2018 -0700 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Wed May 2 20:04:31 2018 +0800 ---------------------------------------------------------------------- .../connectors/kinesis/proxy/KinesisProxy.java | 5 +- .../connectors/kinesis/util/AWSUtil.java | 47 ++++++++++++ .../BeanDeserializerModifierForIgnorables.java | 79 ++++++++++++++++++++ .../kinesis/proxy/KinesisProxyTest.java | 15 ++++ 4 files changed, 145 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fbcadbe6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 3486206..09e9d4c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -186,7 +186,10 @@ public class KinesisProxy implements KinesisProxyInterface { * @return */ protected AmazonKinesis createKinesisClient(Properties configProps) { - return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig()); + + ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); + AWSUtil.setAwsClientConfigProperties(awsClientConfig, configProps); + return AWSUtil.createKinesisClient(configProps, awsClientConfig); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/fbcadbe6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 2e9090e..cfddfa2 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -35,7 +35,16 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.BeanDeserializerFactory; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext; +import com.fasterxml.jackson.databind.deser.DeserializerFactory; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; /** @@ -159,4 +168,42 @@ public class AWSUtil { } return true; } + + /** + * The prefix used for properties that should be applied to {@link ClientConfiguration}. + */ + public static final String AWS_CLIENT_CONFIG_PREFIX = "aws.clientconfig."; + + /** + * Set all prefixed properties on {@link ClientConfiguration}. + * @param config + * @param configProps + */ + public static void setAwsClientConfigProperties(ClientConfiguration config, + Properties configProps) { + + Map<String, Object> awsConfigProperties = new HashMap<>(); + for (Map.Entry<Object, Object> entry : configProps.entrySet()) { + String key = (String) entry.getKey(); + if (key.startsWith(AWS_CLIENT_CONFIG_PREFIX)) { + awsConfigProperties.put(key.substring(AWS_CLIENT_CONFIG_PREFIX.length()), entry.getValue()); + } + } + // Jackson does not like the following properties + String[] ignorableProperties = {"secureRandom"}; + BeanDeserializerModifier modifier = new BeanDeserializerModifierForIgnorables( + ClientConfiguration.class, ignorableProperties); + DeserializerFactory factory = BeanDeserializerFactory.instance.withDeserializerModifier( + modifier); + ObjectMapper mapper = new ObjectMapper(null, null, + new DefaultDeserializationContext.Impl(factory)); + + JsonNode propTree = mapper.convertValue(awsConfigProperties, JsonNode.class); + try { + mapper.readerForUpdating(config).readValue(propTree); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/fbcadbe6/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java new file mode 100644 index 0000000..51b80f3 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/BeanDeserializerModifierForIgnorables.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.util; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.introspect.BeanPropertyDefinition; + +import java.util.ArrayList; +import java.util.List; + +/** + * Jackson bean deserializer utility that allows skipping of properties, for example because they + * cannot be handled by the default serializer or should be ignored for other reason. + * + * <p>Original source: + * https://stackoverflow.com/questions/12305438/jackson-dynamic-filtering-of-properties-during-deserialization + */ +public class BeanDeserializerModifierForIgnorables extends BeanDeserializerModifier { + + private Class<?> type; + private List<String> ignorables; + + public BeanDeserializerModifierForIgnorables(Class clazz, String... properties) { + ignorables = new ArrayList<>(); + for (String property : properties) { + ignorables.add(property); + } + this.type = clazz; + } + + @Override + public BeanDeserializerBuilder updateBuilder( + DeserializationConfig config, BeanDescription beanDesc, + BeanDeserializerBuilder builder) { + if (!type.equals(beanDesc.getBeanClass())) { + return builder; + } + + for (String ignorable : ignorables) { + builder.addIgnorable(ignorable); + } + return builder; + } + + @Override + public List<BeanPropertyDefinition> updateProperties( + DeserializationConfig config, BeanDescription beanDesc, + List<BeanPropertyDefinition> propDefs) { + if (!type.equals(beanDesc.getBeanClass())) { + return propDefs; + } + + List<BeanPropertyDefinition> newPropDefs = new ArrayList<>(); + for (BeanPropertyDefinition propDef : propDefs) { + if (!ignorables.contains(propDef.getName())) { + newPropDefs.add(propDef); + } + } + return newPropDefs; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fbcadbe6/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java index c84d89b..25f4381 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java @@ -86,4 +86,19 @@ public class KinesisProxyTest { assertEquals(10000, clientConfiguration.getSocketTimeout()); } + @Test + public void testClientConfigOverride() { + + Properties configProps = new Properties(); + configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); + configProps.setProperty(AWSUtil.AWS_CLIENT_CONFIG_PREFIX + "socketTimeout", "9999"); + + KinesisProxyInterface proxy = KinesisProxy.create(configProps); + + AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); + ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, + "clientConfiguration"); + assertEquals(9999, clientConfiguration.getSocketTimeout()); + } + }
