This is an automated email from the ASF dual-hosted git repository. jasonhuynh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5b5d659 All configurable parameters are now using '-' as delimiter instead of camelCase Added documentation to the configDef definitions Updated readme with latest configuration changes 5b5d659 is described below commit 5b5d6596aa336ba7f01829f4ee2d2f023eb2b685 Author: Jason Huynh <huyn...@gmail.com> AuthorDate: Thu Feb 13 09:28:00 2020 -0800 All configurable parameters are now using '-' as delimiter instead of camelCase Added documentation to the configDef definitions Updated readme with latest configuration changes --- README.md | 30 +++++++------- .../java/org/geode/kafka/GeodeConnectorConfig.java | 37 ++++++++++------- src/main/java/org/geode/kafka/GeodeContext.java | 25 ++++++------ .../kafka/security/SystemPropertyAuthInit.java | 1 - .../java/org/geode/kafka/sink/GeodeKafkaSink.java | 6 +-- .../org/geode/kafka/sink/GeodeKafkaSinkTask.java | 6 ++- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 15 ++++--- .../org/geode/kafka/source/GeodeKafkaSource.java | 9 +++-- .../geode/kafka/source/GeodeKafkaSourceTask.java | 16 +++++--- .../kafka/source/GeodeSourceConnectorConfig.java | 46 ++++++++++++++-------- .../org/geode/kafka/GeodeConnectorConfigTest.java | 15 ++++--- .../org/geode/kafka/WorkerAndHerderWrapper.java | 4 +- .../kafka/source/GeodeKafkaSourceTaskTest.java | 17 +++----- 13 files changed, 132 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 39dd423..73953a1 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ plugin.path=(Path to your clone)/geode-kafka-connector/build/libs/ name=geode-kafka-sink connector.class=GeodeKafkaSink tasks.max=1 -topicToRegions=[someTopicToSinkFrom:someRegionToConsume] +topic-to-regions=[someTopicToSinkFrom:someRegionToConsume] topics=someTopicToSinkFrom locators=localHost[10334] ``` @@ -35,7 +35,7 @@ locators=localHost[10334] name=geode-kafka-source connector.class=GeodeKafkaSource tasks.max=1 -regionToTopics=[someRegionToSourceFrom:someTopicToConsume] +region-to-topics=[someRegionToSourceFrom:someTopicToConsume] locators=localHost[10334] ``` @@ -47,27 +47,29 @@ bin/connect-standalone.sh config/connect-standalone.properties config/connect-ge #### GeodeKafkaSink Properties | Property | Required | Description| Default | |---|---|---|---| -| locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] | -|topicToRegions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None. This is required to be set in the source connector properties +|locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] | +|topic-to-regions| yes| A comma separated list of "one topic to many regions" bindings. Each binding is surrounded by brackets. For example "[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | "[gkctopic:gkcregion]" |security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html) -|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true | +|null-values-mean-remove | no | If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region | true | -* The topicToRegions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic. +* The topic-to-regions property allows us to create mappings between topics and regions. A single one-to-one mapping would look similar to "[topic:region]" A one-to-many mapping can be made by comma separating the regions, for example "[topic:region1,region2]" This is equivalent to both regions being consumers of the topic. #### GeodeKafkaSource Properties | Property | Required| Description| Default | |---|---|---|---| | locators | no, but...| A comma separated string of locators that configure which locators to connect to | localhost[10334] | -|regionToTopics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None. This is required to be set in the source connector properties| +|region-to-topics| yes | A comma separated list of "one region to many topics" mappings. Each mapping is surrounded by brackets. For example "[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | "[gkcregion:gkctopic]"| |security-client-auth-init| no | Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html) -|geodeConnectorBatchSize| no | Maximum number of records to return on each poll| 100 | -|geodeConnectorQueueSize| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 | -| loadEntireRegion| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true | -|durableClientIdPrefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" | -| durableClientTimeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 | -| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka | +|security-username| no | Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user| null| +|security-password| no | Supply a password to be used to authenticate with Geode| null| +|geode-connector-batch-size| no | Maximum number of records to return on each poll| 100 | +|geode-connector-queue-size| no | Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue | 10000 | +| load-entire-region| no| Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq| true | +|durable-client-id-prefix| no | Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client | "" | +| durable-client-timeout| no | How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated| 60000 | +| cq-prefix| no| Prefix string to identify Connector cq's on a Geode server |cqForGeodeKafka | -* The regionToTopics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics +* The region-to-topics property allows us to create mappings between regions and topics. A single one-to-one mapping would look similar to "[region:topic]" A one-to-many mapping can be made by comma separating the topics, for example "[region:topic1,topic2]" This is equivalent to the region be a producer for both topics --- diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java index 2860a8f..cc151a4 100644 --- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java @@ -14,9 +14,6 @@ */ package org.geode.kafka; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; - import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -24,6 +21,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + public class GeodeConnectorConfig extends AbstractConfig { // GeodeKafka Specific Configuration @@ -37,9 +37,10 @@ public class GeodeConnectorConfig extends AbstractConfig { public static final String LOCATORS = "locators"; public static final String DEFAULT_LOCATOR = "localhost[10334]"; public static final String SECURITY_CLIENT_AUTH_INIT = "security-client-auth-init"; - private static final String DEFAULT_SECURITY_AUTH_INIT = "org.geode.kafka.security.SystemPropertyAuthInit"; + private static final String DEFAULT_SECURITY_AUTH_INIT = + "org.geode.kafka.security.SystemPropertyAuthInit"; public static final String SECURITY_USER = "security-username"; - public static final String SECURITY_PASSWORD= "security-password"; + public static final String SECURITY_PASSWORD = "security-password"; protected final int taskId; protected List<LocatorHostPort> locatorHostPorts; @@ -47,13 +48,13 @@ public class GeodeConnectorConfig extends AbstractConfig { private String securityUserName; private String securityPassword; - //Just for testing + // Just for testing protected GeodeConnectorConfig() { super(new ConfigDef(), new HashMap()); taskId = 0; } - //Just for testing + // Just for testing protected GeodeConnectorConfig(Map<String, String> props) { super(new ConfigDef(), props); taskId = 0; @@ -67,19 +68,27 @@ public class GeodeConnectorConfig extends AbstractConfig { securityUserName = getString(SECURITY_USER); securityPassword = getString(SECURITY_PASSWORD); securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT); - //if we registered a username/password instead of auth init, we should use the default auth init if one isn't specified + // if we registered a username/password instead of auth init, we should use the default auth + // init if one isn't specified if (usesSecurity()) { - securityClientAuthInit = securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT; + securityClientAuthInit = + securityClientAuthInit != null ? securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT; } } protected static ConfigDef configurables() { ConfigDef configDef = new ConfigDef(); - configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM,""); - configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, ""); - configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ""); - configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ""); - configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ""); + configDef.define(TASK_ID, ConfigDef.Type.INT, "0", ConfigDef.Importance.MEDIUM, + "Internally used to identify each task"); + configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, ConfigDef.Importance.HIGH, + "A comma separated string of locators that configure which locators to connect to"); + configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, + "Supply a username to be used to authenticate with Geode. Will autoset the security-client-auth-init to use a SystemPropertyAuthInit if one isn't supplied by the user"); + configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, + "Supply a password to be used to authenticate with Geode"); + configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, + ConfigDef.Importance.HIGH, + "Point to class that implements the [AuthInitialize Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)"); return configDef; } diff --git a/src/main/java/org/geode/kafka/GeodeContext.java b/src/main/java/org/geode/kafka/GeodeContext.java index 6190ef2..9f30242 100644 --- a/src/main/java/org/geode/kafka/GeodeContext.java +++ b/src/main/java/org/geode/kafka/GeodeContext.java @@ -14,10 +14,12 @@ */ package org.geode.kafka; -import java.util.Collection; +import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT; +import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD; +import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER; + import java.util.List; -import org.apache.geode.cache.query.CqResults; import org.apache.kafka.connect.errors.ConnectException; import org.apache.geode.cache.client.ClientCache; @@ -26,12 +28,9 @@ import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqException; import org.apache.geode.cache.query.CqExistsException; import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.query.CqResults; import org.apache.geode.cache.query.RegionNotFoundException; -import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT; -import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD; -import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER; - public class GeodeContext { private ClientCache clientCache; @@ -40,15 +39,18 @@ public class GeodeContext { public GeodeContext() {} public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, - String durableClientId, String durableClientTimeout, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) { + String durableClientId, String durableClientTimeout, String securityAuthInit, + String securityUserName, String securityPassword, boolean usesSecurity) { clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit, securityUserName, securityPassword, usesSecurity); return clientCache; } public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, - String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) { - clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity); + String securityAuthInit, String securityUserName, String securityPassword, + boolean usesSecurity) { + clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, + securityPassword, usesSecurity); return clientCache; } @@ -57,7 +59,8 @@ public class GeodeContext { } public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, - String durableClientTimeOut, String securityAuthInit, String securityUserName, String securityPassword, boolean usesSecurity) { + String durableClientTimeOut, String securityAuthInit, String securityUserName, + String securityPassword, boolean usesSecurity) { ClientCacheFactory ccf = new ClientCacheFactory(); if (usesSecurity) { @@ -93,7 +96,7 @@ public class GeodeContext { } public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, - boolean isDurable) throws ConnectException { + boolean isDurable) throws ConnectException { try { CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable); return cq.executeWithInitialResults(); diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java index cc525a2..6b646ee 100644 --- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java +++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java @@ -19,7 +19,6 @@ import java.util.Properties; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.security.AuthInitialize; import org.apache.geode.security.AuthenticationFailedException; -import org.geode.kafka.GeodeConnectorConfig; public class SystemPropertyAuthInit implements AuthInitialize { diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java index a8985c2..9ee5189 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java +++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java @@ -14,17 +14,17 @@ */ package org.geode.kafka.sink; +import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.geode.kafka.GeodeConnectorConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; - -import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF; +import org.geode.kafka.GeodeConnectorConfig; public class GeodeKafkaSink extends SinkConnector { private Map<String, String> sharedProps; diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java index 7db384f..be44356 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -20,9 +20,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.geode.kafka.GeodeContext; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.geode.kafka.GeodeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +61,9 @@ public class GeodeKafkaSinkTask extends SinkTask { configure(geodeConnectorConfig); geodeContext = new GeodeContext(); geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), - geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity()); + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); regionNameToRegion = createProxyRegions(topicToRegions.values()); } catch (Exception e) { logger.error("Unable to start sink task", e); diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java index bb51b0e..a074220 100644 --- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -14,20 +14,19 @@ */ package org.geode.kafka.sink; -import org.apache.kafka.common.config.ConfigDef; - import java.util.List; import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; import org.geode.kafka.GeodeConnectorConfig; public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { public static final ConfigDef SINK_CONFIG_DEF = configurables(); // Used by sink - public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions"; + public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions"; public static final String DEFAULT_TOPIC_TO_REGION_BINDING = "[gkcTopic:gkcRegion]"; - public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove"; + public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove"; public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; private Map<String, List<String>> topicToRegions; @@ -41,8 +40,12 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { protected static ConfigDef configurables() { ConfigDef configDef = GeodeConnectorConfig.configurables(); - configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, ""); - configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, ""); + configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, + DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, + "A comma separated list of \"one topic to many regions\" bindings. Each binding is surrounded by brackets. For example \"[topicName:regionName], [anotherTopic: regionName, anotherRegion]"); + configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, + DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, + "If set to true, when topics send a SinkRecord with a null value, we will convert to an operation similar to region.remove instead of putting a null value into the region"); return configDef; } diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java index dac94f6..7b4445e 100644 --- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java @@ -14,19 +14,19 @@ */ package org.geode.kafka.source; +import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.geode.kafka.GeodeConnectorConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.util.ConnectorUtils; - -import static org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF; +import org.geode.kafka.GeodeConnectorConfig; public class GeodeKafkaSource extends SourceConnector { @@ -43,7 +43,8 @@ public class GeodeKafkaSource extends SourceConnector { List<Map<String, String>> taskConfigs = new ArrayList<>(); List<String> bindings = GeodeConnectorConfig - .parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS)); + .parseStringByComma( + sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS)); List<List<String>> bindingsPerTask = ConnectorUtils.groupPartitions(bindings, maxTasks); for (int i = 0; i < maxTasks; i++) { diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java index b1c289f..4acc081 100644 --- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java @@ -21,17 +21,16 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.geode.cache.query.CqResults; -import org.apache.geode.cache.query.Struct; -import org.geode.kafka.GeodeContext; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.geode.kafka.GeodeContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqAttributesFactory; -import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqResults; +import org.apache.geode.cache.query.Struct; public class GeodeKafkaSourceTask extends SourceTask { @@ -70,7 +69,9 @@ public class GeodeKafkaSourceTask extends SourceTask { geodeContext = new GeodeContext(); geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), - geodeConnectorConfig.getSecurityClientAuthInit(), geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), geodeConnectorConfig.usesSecurity()); + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); batchSize = geodeConnectorConfig.getBatchSize(); eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize()); @@ -140,7 +141,10 @@ public class GeodeKafkaSourceTask extends SourceTask { geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, isDurable); - eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e -> new GeodeEvent(regionName, ((Struct)e).get("key"), ((Struct)e).get("value"))).collect(Collectors.toList())); + eventBuffer.get() + .addAll((Collection<GeodeEvent>) events.stream().map( + e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value"))) + .collect(Collectors.toList())); } else { geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, diff --git a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java index 78673cd..e96796b 100644 --- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -26,29 +26,30 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { public static final ConfigDef SOURCE_CONFIG_DEF = configurables(); // Geode Configuration - public static final String DURABLE_CLIENT_ID_PREFIX = "durableClientIdPrefix"; + public static final String DURABLE_CLIENT_ID_PREFIX = "durable-client-id-prefix"; public static final String DEFAULT_DURABLE_CLIENT_ID = ""; - public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout"; + public static final String DURABLE_CLIENT_TIME_OUT = "durable-client-timeout"; public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000"; - public static final String CQ_PREFIX = "cqPrefix"; + public static final String CQ_PREFIX = "cq-prefix"; public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka"; /** * Used as a key for source partitions */ public static final String REGION_PARTITION = "regionPartition"; - public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics"; + public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics"; public static final String DEFAULT_REGION_TO_TOPIC_BINDING = "[gkcRegion:gkcTopic]"; - public static final String CQS_TO_REGISTER = "cqsToRegister"; //used internally so that only 1 task will register a cq + public static final String CQS_TO_REGISTER = "cqsToRegister"; // used internally so that only 1 + // task will register a cq - public static final String BATCH_SIZE = "geodeConnectorBatchSize"; + public static final String BATCH_SIZE = "geode-connector-batch-size"; public static final String DEFAULT_BATCH_SIZE = "100"; - public static final String QUEUE_SIZE = "geodeConnectorQueueSize"; + public static final String QUEUE_SIZE = "geode-connector-queue-size"; public static final String DEFAULT_QUEUE_SIZE = "10000"; - public static final String LOAD_ENTIRE_REGION = "loadEntireRegion"; + public static final String LOAD_ENTIRE_REGION = "load-entire-region"; public static final String DEFAULT_LOAD_ENTIRE_REGION = "false"; private final String durableClientId; @@ -81,14 +82,27 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { protected static ConfigDef configurables() { ConfigDef configDef = GeodeConnectorConfig.configurables(); - configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "Internally created and used parameter, for signalling a task to register cqs"); - configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, ""); - configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, ""); - configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, ConfigDef.Importance.LOW, ""); - configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, ""); - configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, ConfigDef.Importance.MEDIUM, ""); - configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, ConfigDef.Importance.MEDIUM, ""); - configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, ConfigDef.Importance.MEDIUM, ""); + configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, + "Internally created and used parameter, for signalling a task to register cqs"); + configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, + DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, + "A comma separated list of \"one region to many topics\" mappings. Each mapping is surrounded by brackets. For example \"[regionName:topicName], \"[anotherRegion: topicName, anotherTopic]\""); + configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_ID, + ConfigDef.Importance.LOW, + "Prefix string for tasks to append to when registering as a durable client. If empty string, will not register as a durable client"); + configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, DEFAULT_DURABLE_CLIENT_TIMEOUT, + ConfigDef.Importance.LOW, + "How long in milliseconds to persist values in Geode's durable queue before the queue is invalidated"); + configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, ConfigDef.Importance.LOW, + "Prefix string to identify Connector cq's on a Geode server"); + configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, + ConfigDef.Importance.MEDIUM, "Maximum number of records to return on each poll"); + configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, + ConfigDef.Importance.MEDIUM, + "Maximum number of entries in the connector queue before backing up all Geode cq listeners sharing the task queue "); + configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, DEFAULT_LOAD_ENTIRE_REGION, + ConfigDef.Importance.MEDIUM, + "Determines if we should queue up all entries that currently exist in a region. This allows us to copy existing region data. Will be replayed whenever a task needs to re-register a cq"); return configDef; } diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java index 11a00f9..5c63d98 100644 --- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java @@ -138,7 +138,8 @@ public class GeodeConnectorConfigTest { public void usesSecurityShouldBeTrueIfSecurityUserSet() { Map<String, String> props = new HashMap<>(); props.put(SECURITY_USER, "some user"); - GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); + GeodeConnectorConfig config = + new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertTrue(config.usesSecurity()); } @@ -146,14 +147,16 @@ public class GeodeConnectorConfigTest { public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() { Map<String, String> props = new HashMap<>(); props.put(SECURITY_CLIENT_AUTH_INIT, "someclass"); - GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); + GeodeConnectorConfig config = + new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertTrue(config.usesSecurity()); } @Test public void usesSecurityShouldBeFalseIfSecurityUserAndSecurityClientAuthInitNotSet() { Map<String, String> props = new HashMap<>(); - GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); + GeodeConnectorConfig config = + new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertFalse(config.usesSecurity()); } @@ -161,14 +164,16 @@ public class GeodeConnectorConfigTest { public void securityClientAuthInitShouldBeSetIfUserIsSet() { Map<String, String> props = new HashMap<>(); props.put(SECURITY_USER, "some user"); - GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); + GeodeConnectorConfig config = + new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertNotNull(config.getSecurityClientAuthInit()); } @Test public void securityClientAuthInitShouldNotBeSetIfUserIsNotSetAndNotSpecificallySet() { Map<String, String> props = new HashMap<>(); - GeodeConnectorConfig config = new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); + GeodeConnectorConfig config = + new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props); assertNull(config.getSecurityClientAuthInit()); } diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java index b4a7bbe..3afcde7 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.geode.kafka.sink.GeodeKafkaSink; -import org.geode.kafka.source.GeodeKafkaSource; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -34,6 +32,8 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.util.ConnectUtils; +import org.geode.kafka.sink.GeodeKafkaSink; +import org.geode.kafka.source.GeodeKafkaSource; public class WorkerAndHerderWrapper { diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 7901426..4fa7d81 100644 --- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -30,26 +30,20 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.geode.cache.query.CqAttributes; -import org.apache.geode.cache.query.CqResults; -import org.apache.geode.cache.query.SelectResults; -import org.apache.geode.cache.query.Struct; -import org.apache.geode.cache.query.internal.LinkedStructSet; -import org.apache.geode.cache.query.internal.ResultsBag; -import org.apache.geode.cache.query.internal.ResultsBag; -import org.apache.geode.cache.query.internal.StructImpl; -import org.apache.geode.cache.query.internal.types.StructTypeImpl; import org.geode.kafka.GeodeContext; import org.junit.Test; import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.cache.query.internal.ResultsBag; public class GeodeKafkaSourceTaskTest { @@ -149,7 +143,8 @@ public class GeodeKafkaSourceTaskTest { GeodeContext geodeContext = mock(GeodeContext.class); when(geodeContext.getClientCache()).thenReturn(clientCache); - when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag()); + when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), + anyBoolean())).thenReturn(new ResultsBag()); Map<String, List<String>> regionToTopicsMap = new HashMap<>(); regionToTopicsMap.put("region1", new ArrayList());