This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4634311e751 [improve][io] Improve kinesis connector config. (#21004)
4634311e751 is described below
commit 4634311e75176f9acec16e37e2900945a8e2040e
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 22 15:31:46 2023 +0800
[improve][io] Improve kinesis connector config. (#21004)
---
.../io/kinesis/AwsCredentialProviderPlugin.java | 29 -----------
.../io/kinesis/AwsDefaultProviderChainPlugin.java | 30 ------------
.../pulsar/io/kinesis/BaseKinesisConfig.java | 7 +++
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 17 +++----
.../pulsar/io/kinesis/KinesisSinkConfig.java | 18 ++++---
.../apache/pulsar/io/kinesis/KinesisSource.java | 17 +------
.../pulsar/io/kinesis/KinesisSourceConfig.java | 36 +++++++-------
.../io/kinesis/STSAssumeRoleProviderPlugin.java | 31 ------------
.../pulsar/io/kinesis/KinesisSinkConfigTests.java | 33 ++++---------
.../io/kinesis/KinesisSourceConfigTests.java | 56 ++++++----------------
.../kinesis/src/test/resources/sinkConfig.yaml | 27 -----------
.../kinesis/src/test/resources/sourceConfig.yaml | 32 -------------
12 files changed, 71 insertions(+), 262 deletions(-)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
deleted file mode 100644
index e88a952293b..00000000000
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility. In new code and
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.AwsCredentialProviderPlugin
- */
-@Deprecated
-public interface AwsCredentialProviderPlugin extends
org.apache.pulsar.io.aws.AwsCredentialProviderPlugin {
-}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
deleted file mode 100644
index 75952a71a29..00000000000
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility. In new code and
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
- */
-@Deprecated
-public class AwsDefaultProviderChainPlugin extends
org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
- implements AwsCredentialProviderPlugin {
-}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
index c9c951ae2b7..7bd95b0d6e3 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
@@ -35,6 +35,13 @@ public abstract class BaseKinesisConfig implements
Serializable {
)
private String awsEndpoint = "";
+ @FieldDoc(
+ required = false,
+ defaultValue = "",
+ help = "Cloudwatch end-point url. It can be found at "
+ +
"https://docs.aws.amazon.com/general/latest/gr/rande.html"
+ )
+ private String cloudwatchEndpoint = "";
@FieldDoc(
required = false,
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index fb8eedff82f..d8e4e4bab85 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.io.kinesis;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -49,7 +48,6 @@ import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
-import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
@@ -155,17 +153,16 @@ public class KinesisSink extends AbstractAwsConnector
implements Sink<GenericObj
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config,
KinesisSinkConfig.class, sinkContext);
+ kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
this.sinkContext = sinkContext;
- checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()),
"empty kinesis-stream name");
- checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint())
- || isNotBlank(kinesisSinkConfig.getAwsRegion()),
- "Either the aws-end-point or aws-region must be set");
-
checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()),
"empty aws-credential param");
-
KinesisProducerConfiguration kinesisConfig = new
KinesisProducerConfiguration();
- kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
+ if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
+
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
+ }
+ if (isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())) {
+
kinesisConfig.setCloudwatchEndpoint(kinesisSinkConfig.getCloudwatchEndpoint());
+ }
if (kinesisSinkConfig.getAwsEndpointPort() != null) {
kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort());
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index c5b26a26d0c..f81fd32134b 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -18,13 +18,14 @@
*/
package org.apache.pulsar.io.kinesis;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
+import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
@Data
@@ -103,9 +104,12 @@ public class KinesisSinkConfig extends BaseKinesisConfig
implements Serializable
help = "The maximum delay(in milliseconds) between retries.")
private long retryMaxDelayInMillis = 60000;
- public static KinesisSinkConfig load(String yamlFile) throws IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
+ public static KinesisSinkConfig load(Map<String, Object> config,
SinkContext sinkContext) {
+ KinesisSinkConfig kinesisSinkConfig =
IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
+ checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion())
+ || (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) &&
isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())),
+ "Either \"awsRegion\" must be set OR all of [\"awsEndpoint\",
\"cloudwatchEndpoint\"] must be set.");
+ return kinesisSinkConfig;
}
public enum MessageFormat {
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
index 2412244e1b5..279368db2a0 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.io.kinesis;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
@@ -27,14 +25,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
-import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
-import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
@@ -68,18 +64,7 @@ public class KinesisSource extends AbstractAwsConnector
implements Source<byte[]
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
- this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config,
KinesisSourceConfig.class, sourceContext);
-
-
checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty
kinesis-stream name");
- checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint())
- || isNotBlank(kinesisSourceConfig.getAwsRegion()),
- "Either the aws-end-point or aws-region must be set");
-
checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()),
"empty aws-credential param");
-
- if (kinesisSourceConfig.getInitialPositionInStream() ==
InitialPositionInStream.AT_TIMESTAMP) {
- checkArgument((kinesisSourceConfig.getStartAtTime() != null),
"Timestamp must be specified");
- }
-
+ this.kinesisSourceConfig = KinesisSourceConfig.load(config,
sourceContext);
queue = new
LinkedBlockingQueue<>(kinesisSourceConfig.getReceiveQueueSize());
workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" +
UUID.randomUUID();
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
index f0bf7cfc978..0dd9bfce9e0 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
@@ -18,16 +18,17 @@
*/
package org.apache.pulsar.io.kinesis;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Serializable;
import java.net.URI;
import java.util.Date;
+import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
@@ -76,7 +77,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig
implements Serializab
@FieldDoc(
required = false,
- defaultValue = "Apache Pulsar IO Connector",
+ defaultValue = "pulsar-kinesis",
help = "Name of the Amazon Kinesis application. By default the
application name is included "
+ "in the user agent string used to make AWS requests. This
can assist with troubleshooting "
+ "(e.g. distinguish requests made by separate connectors
instances)."
@@ -122,13 +123,6 @@ public class KinesisSourceConfig extends BaseKinesisConfig
implements Serializab
)
private String dynamoEndpoint = "";
- @FieldDoc(
- required = false,
- defaultValue = "",
- help = "Cloudwatch end-point url. It can be found at
https://docs.aws.amazon.com/general/latest/gr/rande.html"
- )
- private String cloudwatchEndpoint = "";
-
@FieldDoc(
required = false,
defaultValue = "true",
@@ -136,10 +130,20 @@ public class KinesisSourceConfig extends
BaseKinesisConfig implements Serializab
)
private boolean useEnhancedFanOut = true;
-
- public static KinesisSourceConfig load(String yamlFile) throws IOException
{
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
+ public static KinesisSourceConfig load(Map<String, Object> config,
SourceContext sourceContext) {
+ KinesisSourceConfig kinesisSourceConfig =
IOConfigUtils.loadWithSecrets(config,
+ KinesisSourceConfig.class, sourceContext);
+ boolean isNotBlankEndpoint =
isNotBlank(kinesisSourceConfig.getAwsEndpoint())
+ && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
+ && isNotBlank(kinesisSourceConfig.getDynamoEndpoint());
+ checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) ||
isNotBlankEndpoint,
+ "Either \"awsRegion\" must be set OR all of "
+ + "[ \"awsEndpoint\", \"cloudwatchEndpoint\", and
\"dynamoEndpoint\" ] must be set.");
+ if (kinesisSourceConfig.getInitialPositionInStream() ==
InitialPositionInStream.AT_TIMESTAMP) {
+ checkArgument((kinesisSourceConfig.getStartAtTime() != null),
+ "When initialPositionInStream is AT_TIMESTAMP, startAtTime
must be specified");
+ }
+ return kinesisSourceConfig;
}
public KinesisAsyncClient
buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
deleted file mode 100644
index e305c9c9b9f..00000000000
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility. In new code and
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
- */
-@Deprecated
-public class STSAssumeRoleProviderPlugin extends
org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
- implements AwsCredentialProviderPlugin {
-}
-
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
index 6f76d9e69a2..a5051927ace 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
@@ -21,34 +21,16 @@ package org.apache.pulsar.io.kinesis;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
import org.mockito.Mockito;
import org.testng.annotations.Test;
public class KinesisSinkConfigTests {
- @Test
- public final void loadFromYamlFileTest() throws IOException {
- File yamlFile = getFile("sinkConfig.yaml");
- KinesisSinkConfig config =
KinesisSinkConfig.load(yamlFile.getAbsolutePath());
-
- assertNotNull(config);
- assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
- assertEquals(config.getAwsRegion(), "us-east-1");
- assertEquals(config.getAwsKinesisStreamName(), "my-stream");
- assertEquals(config.getAwsCredentialPluginParam(),
- "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
- assertEquals(config.getMessageFormat(),
MessageFormat.ONLY_RAW_PAYLOAD);
- assertEquals(true, config.isRetainOrdering());
- }
-
@Test
public final void loadFromMapTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
@@ -58,7 +40,7 @@ public class KinesisSinkConfigTests {
map.put("awsCredentialPluginParam",
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
SinkContext sinkContext = Mockito.mock(SinkContext.class);
- KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map,
KinesisSinkConfig.class, sinkContext);
+ KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
@@ -78,7 +60,7 @@ public class KinesisSinkConfigTests {
SinkContext sinkContext = Mockito.mock(SinkContext.class);
Mockito.when(sinkContext.getSecret("awsCredentialPluginParam"))
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
- KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map,
KinesisSinkConfig.class, sinkContext);
+ KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
@@ -88,8 +70,13 @@ public class KinesisSinkConfigTests {
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
}
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public final void missCloudWatchEndpointTest() {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("awsEndpoint", "https://some.endpoint.aws");
+ map.put("awsKinesisStreamName", "my-stream");
+ map.put("awsCredentialPluginParam",
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ KinesisSinkConfig.load(map, sinkContext);
}
}
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
index f6b0666d34b..4ba3593b1d9 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kinesis;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import java.io.File;
import java.io.IOException;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
@@ -30,7 +29,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -54,30 +52,6 @@ public class KinesisSourceConfigTests {
DAY = then.getTime();
}
- @Test
- public final void loadFromYamlFileTest() throws IOException {
- File yamlFile = getFile("sourceConfig.yaml");
- KinesisSourceConfig config =
KinesisSourceConfig.load(yamlFile.getAbsolutePath());
- assertNotNull(config);
- assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
- assertEquals(config.getAwsRegion(), "us-east-1");
- assertEquals(config.getAwsKinesisStreamName(), "my-stream");
- assertEquals(config.getAwsCredentialPluginParam(),
- "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
- assertEquals(config.getApplicationName(), "My test application");
- assertEquals(config.getCheckpointInterval(), 30000);
- assertEquals(config.getBackoffTime(), 4000);
- assertEquals(config.getNumRetries(), 3);
- assertEquals(config.getReceiveQueueSize(), 2000);
- assertEquals(config.getInitialPositionInStream(),
InitialPositionInStream.TRIM_HORIZON);
-
- Calendar cal = Calendar.getInstance();
- cal.setTime(config.getStartAtTime());
- ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(),
ZoneOffset.UTC);
- ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(),
ZoneOffset.UTC);
- assertEquals(actual, expected);
- }
-
@Test
public final void loadFromMapTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
@@ -89,12 +63,11 @@ public class KinesisSourceConfigTests {
map.put("backoffTime", "4000");
map.put("numRetries", "3");
map.put("receiveQueueSize", 2000);
- map.put("applicationName", "My test application");
map.put("initialPositionInStream",
InitialPositionInStream.TRIM_HORIZON);
map.put("startAtTime", DAY);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
- KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map,
KinesisSourceConfig.class, sourceContext);
+ KinesisSourceConfig config = KinesisSourceConfig.load(map,
sourceContext);
assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
@@ -102,7 +75,7 @@ public class KinesisSourceConfigTests {
assertEquals(config.getAwsKinesisStreamName(), "my-stream");
assertEquals(config.getAwsCredentialPluginParam(),
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
- assertEquals(config.getApplicationName(), "My test application");
+ assertEquals(config.getApplicationName(), "pulsar-kinesis");
assertEquals(config.getCheckpointInterval(), 30000);
assertEquals(config.getBackoffTime(), 4000);
assertEquals(config.getNumRetries(), 3);
@@ -133,7 +106,7 @@ public class KinesisSourceConfigTests {
SourceContext sourceContext = Mockito.mock(SourceContext.class);
Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
- KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map,
KinesisSourceConfig.class, sourceContext);
+ KinesisSourceConfig config = KinesisSourceConfig.load(map,
sourceContext);
assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
@@ -156,19 +129,17 @@ public class KinesisSourceConfigTests {
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "empty aws-credential param")
+ expectedExceptionsMessageRegExp = "awsCredentialPluginParam cannot
be null")
public final void missingCredentialsTest() throws Exception {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("awsEndpoint", "https://some.endpoint.aws");
map.put("awsRegion", "us-east-1");
map.put("awsKinesisStreamName", "my-stream");
-
- KinesisSource source = new KinesisSource();
- source.open(map, null);
+ KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Timestamp must be specified")
+ expectedExceptionsMessageRegExp = "When initialPositionInStream is
AT_TIMESTAMP, startAtTime must be specified")
public final void missingStartTimeTest() throws Exception {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("awsEndpoint", "https://some.endpoint.aws");
@@ -177,13 +148,16 @@ public class KinesisSourceConfigTests {
map.put("awsCredentialPluginParam",
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
map.put("initialPositionInStream",
InitialPositionInStream.AT_TIMESTAMP);
-
- KinesisSource source = new KinesisSource();
- source.open(map, null);
+ KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
}
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public final void missCloudWatchEndpointTest() {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("awsEndpoint", "https://some.endpoint.aws");
+ map.put("awsKinesisStreamName", "my-stream");
+ map.put("awsCredentialPluginParam",
+ "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+ KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
}
}
diff --git a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml
b/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index 7d99db65d07..00000000000
--- a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
- "awsEndpoint" : "https://some.endpoint.aws",
- "awsRegion": "us-east-1",
- "awsKinesisStreamName": "my-stream",
- "awsCredentialPluginParam":
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
- "messageFormat": "ONLY_RAW_PAYLOAD",
- "retainOrdering": "true"
-}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml
b/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml
deleted file mode 100644
index 64b564486c1..00000000000
--- a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
- "awsEndpoint" : "https://some.endpoint.aws",
- "awsRegion": "us-east-1",
- "awsKinesisStreamName": "my-stream",
- "awsCredentialPluginParam":
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
- "applicationName": "My test application",
- "checkpointInterval": "30000",
- "backoffTime":"4000",
- "numRetries":"3",
- "receiveQueueSize": 2000,
- "initialPositionInStream": "TRIM_HORIZON",
- "startAtTime": "2019-03-05T19:28:58.000Z"
-}
\ No newline at end of file