This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b5d659  All configurable parameters are now using '-' as delimiter 
instead of camelCase Added documentation to the configDef definitions Updated 
readme with latest configuration changes
5b5d659 is described below

commit 5b5d6596aa336ba7f01829f4ee2d2f023eb2b685
Author: Jason Huynh <huyn...@gmail.com>
AuthorDate: Thu Feb 13 09:28:00 2020 -0800

    All configurable parameters are now using '-' as delimiter instead of 
camelCase
    Added documentation to the configDef definitions
    Updated readme with latest configuration changes
---
 README.md                                          | 30 +++++++-------
 .../java/org/geode/kafka/GeodeConnectorConfig.java | 37 ++++++++++-------
 src/main/java/org/geode/kafka/GeodeContext.java    | 25 ++++++------
 .../kafka/security/SystemPropertyAuthInit.java     |  1 -
 .../java/org/geode/kafka/sink/GeodeKafkaSink.java  |  6 +--
 .../org/geode/kafka/sink/GeodeKafkaSinkTask.java   |  6 ++-
 .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 15 ++++---
 .../org/geode/kafka/source/GeodeKafkaSource.java   |  9 +++--
 .../geode/kafka/source/GeodeKafkaSourceTask.java   | 16 +++++---
 .../kafka/source/GeodeSourceConnectorConfig.java   | 46 ++++++++++++++--------
 .../org/geode/kafka/GeodeConnectorConfigTest.java  | 15 ++++---
 .../org/geode/kafka/WorkerAndHerderWrapper.java    |  4 +-
 .../kafka/source/GeodeKafkaSourceTaskTest.java     | 17 +++-----
 13 files changed, 132 insertions(+), 95 deletions(-)

diff --git a/README.md b/README.md
index 39dd423..73953a1 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@ plugin.path=(Path to your 
clone)/geode-kafka-connector/build/libs/
 name=geode-kafka-sink
 connector.class=GeodeKafkaSink
 tasks.max=1
-topicToRegions=[someTopicToSinkFrom:someRegionToConsume]
+topic-to-regions=[someTopicToSinkFrom:someRegionToConsume]
 topics=someTopicToSinkFrom
 locators=localHost[10334]
 ```
@@ -35,7 +35,7 @@ locators=localHost[10334]
 name=geode-kafka-source
 connector.class=GeodeKafkaSource
 tasks.max=1
-regionToTopics=[someRegionToSourceFrom:someTopicToConsume]
+region-to-topics=[someRegionToSourceFrom:someTopicToConsume]
 locators=localHost[10334]
 ```
 
@@ -47,27 +47,29 @@ bin/connect-standalone.sh 
config/connect-standalone.properties config/connect-ge
 #### GeodeKafkaSink Properties
 | Property | Required | Description| Default |
 |---|---|---|---|
-| locators | no, but...| A comma separated string of locators that configure 
which locators to connect to | localhost[10334] |
-|topicToRegions| yes| A comma separated list of "one topic to many regions" 
bindings.  Each binding is surrounded by brackets. For example 
"[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | None.  
This is required to be set in the source connector properties
+|locators | no, but...| A comma separated string of locators that configure 
which locators to connect to | localhost[10334] |
+|topic-to-regions| yes| A comma separated list of "one topic to many regions" 
bindings.  Each binding is surrounded by brackets. For example 
"[topicName:regionName], [anotherTopic: regionName, anotherRegion]" | 
"[gkctopic:gkcregion]"
 |security-client-auth-init| no | Point to class that implements the 
[AuthInitialize 
Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|nullValuesMeanRemove | no | If set to true, when topics send a SinkRecord 
with a null value, we will convert to an operation similar to region.remove 
instead of putting a null value into the region | true |
+|null-values-mean-remove | no | If set to true, when topics send a SinkRecord 
with a null value, we will convert to an operation similar to region.remove 
instead of putting a null value into the region | true |
 
-* The topicToRegions property allows us to create mappings between topics  and 
regions.  A single one-to-one mapping would look similar to "[topic:region]" A 
one-to-many mapping can be made by comma separating the regions, for example 
"[topic:region1,region2]"  This is equivalent to both regions being consumers 
of the topic.
+* The topic-to-regions property allows us to create mappings between topics  
and regions.  A single one-to-one mapping would look similar to 
"[topic:region]" A one-to-many mapping can be made by comma separating the 
regions, for example "[topic:region1,region2]"  This is equivalent to both 
regions being consumers of the topic.
 
 #### GeodeKafkaSource Properties
 | Property | Required| Description| Default |
 |---|---|---|---|
 | locators | no, but...| A comma separated string of locators that configure 
which locators to connect to | localhost[10334] |
-|regionToTopics| yes | A comma separated list of "one region to many topics" 
mappings.  Each mapping is surrounded by brackets.  For example 
"[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | None.  
This is required to be set in the source connector properties|
+|region-to-topics| yes | A comma separated list of "one region to many topics" 
mappings.  Each mapping is surrounded by brackets.  For example 
"[regionName:topicName], "[anotherRegion: topicName, anotherTopic]" | 
"[gkcregion:gkctopic]"|
 |security-client-auth-init| no | Point to class that implements the 
[AuthInitialize 
Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)
-|geodeConnectorBatchSize| no | Maximum number of records to return on each 
poll| 100 |
-|geodeConnectorQueueSize| no | Maximum number of entries in the connector 
queue before backing up all Geode cq listeners sharing the task queue | 10000 |
-| loadEntireRegion| no| Determines if we should queue up all entries that 
currently exist in a region.  This allows us to copy existing region data.  
Will be replayed whenever a task needs to re-register a cq| true |
-|durableClientIdPrefix| no | Prefix string for tasks to append to when 
registering as a durable client.  If empty string, will not register as a 
durable client | "" |
-| durableClientTimeout| no | How long in milliseconds to persist values in 
Geode's durable queue before the queue is invalidated| 60000 |
-| cqPrefix| no| Prefix string to identify Connector cq's on a Geode server 
|cqForGeodeKafka |
+|security-username| no | Supply a username to be used to authenticate with 
Geode.  Will autoset the security-client-auth-init to use a 
SystemPropertyAuthInit if one isn't supplied by the user| null|
+|security-password| no | Supply a password to be used to authenticate with 
Geode| null|
+|geode-connector-batch-size| no | Maximum number of records to return on each 
poll| 100 |
+|geode-connector-queue-size| no | Maximum number of entries in the connector 
queue before backing up all Geode cq listeners sharing the task queue | 10000 |
+| load-entire-region| no| Determines if we should queue up all entries that 
currently exist in a region.  This allows us to copy existing region data.  
Will be replayed whenever a task needs to re-register a cq| true |
+|durable-client-id-prefix| no | Prefix string for tasks to append to when 
registering as a durable client.  If empty string, will not register as a 
durable client | "" |
+| durable-client-timeout| no | How long in milliseconds to persist values in 
Geode's durable queue before the queue is invalidated| 60000 |
+| cq-prefix| no| Prefix string to identify Connector cq's on a Geode server 
|cqForGeodeKafka |
 
-* The regionToTopics property allows us to create mappings between regions and 
topics.  A single one-to-one mapping would look similar to "[region:topic]" A 
one-to-many mapping can be made by comma separating the topics, for example 
"[region:topic1,topic2]"  This is equivalent to the region be a producer for 
both topics 
+* The region-to-topics property allows us to create mappings between regions 
and topics.  A single one-to-one mapping would look similar to "[region:topic]" 
A one-to-many mapping can be made by comma separating the topics, for example 
"[region:topic1,topic2]"  This is equivalent to the region be a producer for 
both topics 
 
 ---
 
diff --git a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
index 2860a8f..cc151a4 100644
--- a/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/GeodeConnectorConfig.java
@@ -14,9 +14,6 @@
  */
 package org.geode.kafka;
 
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,6 +21,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
 public class GeodeConnectorConfig extends AbstractConfig {
 
   // GeodeKafka Specific Configuration
@@ -37,9 +37,10 @@ public class GeodeConnectorConfig extends AbstractConfig {
   public static final String LOCATORS = "locators";
   public static final String DEFAULT_LOCATOR = "localhost[10334]";
   public static final String SECURITY_CLIENT_AUTH_INIT = 
"security-client-auth-init";
-  private static final String DEFAULT_SECURITY_AUTH_INIT = 
"org.geode.kafka.security.SystemPropertyAuthInit";
+  private static final String DEFAULT_SECURITY_AUTH_INIT =
+      "org.geode.kafka.security.SystemPropertyAuthInit";
   public static final String SECURITY_USER = "security-username";
-  public static final String SECURITY_PASSWORD= "security-password";
+  public static final String SECURITY_PASSWORD = "security-password";
 
   protected final int taskId;
   protected List<LocatorHostPort> locatorHostPorts;
@@ -47,13 +48,13 @@ public class GeodeConnectorConfig extends AbstractConfig {
   private String securityUserName;
   private String securityPassword;
 
-  //Just for testing
+  // Just for testing
   protected GeodeConnectorConfig() {
     super(new ConfigDef(), new HashMap());
     taskId = 0;
   }
 
-  //Just for testing
+  // Just for testing
   protected GeodeConnectorConfig(Map<String, String> props) {
     super(new ConfigDef(), props);
     taskId = 0;
@@ -67,19 +68,27 @@ public class GeodeConnectorConfig extends AbstractConfig {
     securityUserName = getString(SECURITY_USER);
     securityPassword = getString(SECURITY_PASSWORD);
     securityClientAuthInit = getString(SECURITY_CLIENT_AUTH_INIT);
-    //if we registered a username/password instead of auth init, we should use 
the default auth init if one isn't specified
+    // if we registered a username/password instead of auth init, we should 
use the default auth
+    // init if one isn't specified
     if (usesSecurity()) {
-      securityClientAuthInit = securityClientAuthInit != null ? 
securityClientAuthInit : DEFAULT_SECURITY_AUTH_INIT;
+      securityClientAuthInit =
+          securityClientAuthInit != null ? securityClientAuthInit : 
DEFAULT_SECURITY_AUTH_INIT;
     }
   }
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = new ConfigDef();
-    configDef.define(TASK_ID, ConfigDef.Type.INT,  "0", 
ConfigDef.Importance.MEDIUM,"");
-    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, 
ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH, "");
-    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH, "");
+    configDef.define(TASK_ID, ConfigDef.Type.INT, "0", 
ConfigDef.Importance.MEDIUM,
+        "Internally used to identify each task");
+    configDef.define(LOCATORS, ConfigDef.Type.STRING, DEFAULT_LOCATOR, 
ConfigDef.Importance.HIGH,
+        "A comma separated string of locators that configure which locators to 
connect to");
+    configDef.define(SECURITY_USER, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH,
+        "Supply a username to be used to authenticate with Geode.  Will 
autoset the security-client-auth-init to use a SystemPropertyAuthInit if one 
isn't supplied by the user");
+    configDef.define(SECURITY_PASSWORD, ConfigDef.Type.STRING, null, 
ConfigDef.Importance.HIGH,
+        "Supply a password to be used to authenticate with Geode");
+    configDef.define(SECURITY_CLIENT_AUTH_INIT, ConfigDef.Type.STRING, null,
+        ConfigDef.Importance.HIGH,
+        "Point to class that implements the [AuthInitialize 
Interface](https://gemfire.docs.pivotal.io/99/geode/managing/security/implementing_authentication.html)");
     return configDef;
   }
 
diff --git a/src/main/java/org/geode/kafka/GeodeContext.java 
b/src/main/java/org/geode/kafka/GeodeContext.java
index 6190ef2..9f30242 100644
--- a/src/main/java/org/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/geode/kafka/GeodeContext.java
@@ -14,10 +14,12 @@
  */
 package org.geode.kafka;
 
-import java.util.Collection;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
+import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
+
 import java.util.List;
 
-import org.apache.geode.cache.query.CqResults;
 import org.apache.kafka.connect.errors.ConnectException;
 
 import org.apache.geode.cache.client.ClientCache;
@@ -26,12 +28,9 @@ import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqException;
 import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.RegionNotFoundException;
 
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_CLIENT_AUTH_INIT;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_PASSWORD;
-import static org.geode.kafka.GeodeConnectorConfig.SECURITY_USER;
-
 public class GeodeContext {
 
   private ClientCache clientCache;
@@ -40,15 +39,18 @@ public class GeodeContext {
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String 
securityAuthInit, String securityUserName, String securityPassword, boolean 
usesSecurity) {
+      String durableClientId, String durableClientTimeout, String 
securityAuthInit,
+      String securityUserName, String securityPassword, boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, 
durableClientTimeout,
         securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit, String securityUserName, String 
securityPassword, boolean usesSecurity) {
-    clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit, securityUserName, securityPassword, usesSecurity);
+      String securityAuthInit, String securityUserName, String 
securityPassword,
+      boolean usesSecurity) {
+    clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit, securityUserName,
+        securityPassword, usesSecurity);
     return clientCache;
   }
 
@@ -57,7 +59,8 @@ public class GeodeContext {
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String 
durableClientName,
-      String durableClientTimeOut, String securityAuthInit, String 
securityUserName, String securityPassword, boolean usesSecurity) {
+      String durableClientTimeOut, String securityAuthInit, String 
securityUserName,
+      String securityPassword, boolean usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
     if (usesSecurity) {
@@ -93,7 +96,7 @@ public class GeodeContext {
   }
 
   public CqResults newCqWithInitialResults(String name, String query, 
CqAttributes cqAttributes,
-                                                   boolean isDurable) throws 
ConnectException {
+      boolean isDurable) throws ConnectException {
     try {
       CqQuery cq = clientCache.getQueryService().newCq(name, query, 
cqAttributes, isDurable);
       return cq.executeWithInitialResults();
diff --git a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java 
b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
index cc525a2..6b646ee 100644
--- a/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/geode/kafka/security/SystemPropertyAuthInit.java
@@ -19,7 +19,6 @@ import java.util.Properties;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.security.AuthInitialize;
 import org.apache.geode.security.AuthenticationFailedException;
-import org.geode.kafka.GeodeConnectorConfig;
 
 
 public class SystemPropertyAuthInit implements AuthInitialize {
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java 
b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
index a8985c2..9ee5189 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSink.java
@@ -14,17 +14,17 @@
  */
 package org.geode.kafka.sink;
 
+import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
-
-import static org.geode.kafka.sink.GeodeSinkConnectorConfig.SINK_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeKafkaSink extends SinkConnector {
   private Map<String, String> sharedProps;
diff --git a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
index 7db384f..be44356 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -20,9 +20,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +61,9 @@ public class GeodeKafkaSinkTask extends SinkTask {
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(), 
geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(),
+          geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(),
+          geodeConnectorConfig.usesSecurity());
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
diff --git a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java 
b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
index bb51b0e..a074220 100644
--- a/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -14,20 +14,19 @@
  */
 package org.geode.kafka.sink;
 
-import org.apache.kafka.common.config.ConfigDef;
-
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeSinkConnectorConfig extends GeodeConnectorConfig {
   public static final ConfigDef SINK_CONFIG_DEF = configurables();
 
   // Used by sink
-  public static final String TOPIC_TO_REGION_BINDINGS = "topicToRegions";
+  public static final String TOPIC_TO_REGION_BINDINGS = "topic-to-regions";
   public static final String DEFAULT_TOPIC_TO_REGION_BINDING = 
"[gkcTopic:gkcRegion]";
-  public static final String NULL_VALUES_MEAN_REMOVE = "nullValuesMeanRemove";
+  public static final String NULL_VALUES_MEAN_REMOVE = 
"null-values-mean-remove";
   public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
 
   private Map<String, List<String>> topicToRegions;
@@ -41,8 +40,12 @@ public class GeodeSinkConnectorConfig extends 
GeodeConnectorConfig {
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING, 
DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH, "");
-    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN, 
DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(TOPIC_TO_REGION_BINDINGS, ConfigDef.Type.STRING,
+        DEFAULT_TOPIC_TO_REGION_BINDING, ConfigDef.Importance.HIGH,
+        "A comma separated list of \"one topic to many regions\" bindings.  
Each binding is surrounded by brackets. For example \"[topicName:regionName], 
[anotherTopic: regionName, anotherRegion]");
+    configDef.define(NULL_VALUES_MEAN_REMOVE, ConfigDef.Type.BOOLEAN,
+        DEFAULT_NULL_VALUES_MEAN_REMOVE, ConfigDef.Importance.MEDIUM,
+        "If set to true, when topics send a SinkRecord with a null value, we 
will convert to an operation similar to region.remove instead of putting a null 
value into the region");
     return configDef;
   }
 
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java 
b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
index dac94f6..7b4445e 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSource.java
@@ -14,19 +14,19 @@
  */
 package org.geode.kafka.source;
 
+import static 
org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.geode.kafka.GeodeConnectorConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.util.ConnectorUtils;
-
-import static 
org.geode.kafka.source.GeodeSourceConnectorConfig.SOURCE_CONFIG_DEF;
+import org.geode.kafka.GeodeConnectorConfig;
 
 
 public class GeodeKafkaSource extends SourceConnector {
@@ -43,7 +43,8 @@ public class GeodeKafkaSource extends SourceConnector {
     List<Map<String, String>> taskConfigs = new ArrayList<>();
     List<String> bindings =
         GeodeConnectorConfig
-            
.parseStringByComma(sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
+            .parseStringByComma(
+                
sharedProps.get(GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS));
     List<List<String>> bindingsPerTask = 
ConnectorUtils.groupPartitions(bindings, maxTasks);
 
     for (int i = 0; i < maxTasks; i++) {
diff --git a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
index b1c289f..4acc081 100644
--- a/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,17 +21,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.Struct;
-import org.geode.kafka.GeodeContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.geode.kafka.GeodeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
-import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
 
 public class GeodeKafkaSourceTask extends SourceTask {
 
@@ -70,7 +69,9 @@ public class GeodeKafkaSourceTask extends SourceTask {
       geodeContext = new GeodeContext();
       geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
           geodeConnectorConfig.getDurableClientId(), 
geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(), 
geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(), 
geodeConnectorConfig.usesSecurity());
+          geodeConnectorConfig.getSecurityClientAuthInit(),
+          geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(),
+          geodeConnectorConfig.usesSecurity());
 
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new 
SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
@@ -140,7 +141,10 @@ public class GeodeKafkaSourceTask extends SourceTask {
             geodeContext.newCqWithInitialResults(generateCqName(taskId, 
cqPrefix, regionName),
                 "select * from /" + regionName, cqAttributes,
                 isDurable);
-        eventBuffer.get().addAll((Collection<GeodeEvent>)events.stream().map(e 
-> new GeodeEvent(regionName, ((Struct)e).get("key"), 
((Struct)e).get("value"))).collect(Collectors.toList()));
+        eventBuffer.get()
+            .addAll((Collection<GeodeEvent>) events.stream().map(
+                e -> new GeodeEvent(regionName, ((Struct) e).get("key"), 
((Struct) e).get("value")))
+                .collect(Collectors.toList()));
       } else {
         geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
             "select * from /" + regionName, cqAttributes,
diff --git 
a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java 
b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
index 78673cd..e96796b 100644
--- a/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ b/src/main/java/org/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -26,29 +26,30 @@ public class GeodeSourceConnectorConfig extends 
GeodeConnectorConfig {
   public static final ConfigDef SOURCE_CONFIG_DEF = configurables();
 
   // Geode Configuration
-  public static final String DURABLE_CLIENT_ID_PREFIX = 
"durableClientIdPrefix";
+  public static final String DURABLE_CLIENT_ID_PREFIX = 
"durable-client-id-prefix";
   public static final String DEFAULT_DURABLE_CLIENT_ID = "";
-  public static final String DURABLE_CLIENT_TIME_OUT = "durableClientTimeout";
+  public static final String DURABLE_CLIENT_TIME_OUT = 
"durable-client-timeout";
   public static final String DEFAULT_DURABLE_CLIENT_TIMEOUT = "60000";
 
-  public static final String CQ_PREFIX = "cqPrefix";
+  public static final String CQ_PREFIX = "cq-prefix";
   public static final String DEFAULT_CQ_PREFIX = "cqForGeodeKafka";
 
   /**
    * Used as a key for source partitions
    */
   public static final String REGION_PARTITION = "regionPartition";
-  public static final String REGION_TO_TOPIC_BINDINGS = "regionToTopics";
+  public static final String REGION_TO_TOPIC_BINDINGS = "region-to-topics";
   public static final String DEFAULT_REGION_TO_TOPIC_BINDING = 
"[gkcRegion:gkcTopic]";
-  public static final String CQS_TO_REGISTER = "cqsToRegister"; //used 
internally so that only 1 task will register a cq
+  public static final String CQS_TO_REGISTER = "cqsToRegister"; // used 
internally so that only 1
+                                                                // task will 
register a cq
 
-  public static final String BATCH_SIZE = "geodeConnectorBatchSize";
+  public static final String BATCH_SIZE = "geode-connector-batch-size";
   public static final String DEFAULT_BATCH_SIZE = "100";
 
-  public static final String QUEUE_SIZE = "geodeConnectorQueueSize";
+  public static final String QUEUE_SIZE = "geode-connector-queue-size";
   public static final String DEFAULT_QUEUE_SIZE = "10000";
 
-  public static final String LOAD_ENTIRE_REGION = "loadEntireRegion";
+  public static final String LOAD_ENTIRE_REGION = "load-entire-region";
   public static final String DEFAULT_LOAD_ENTIRE_REGION = "false";
 
   private final String durableClientId;
@@ -81,14 +82,27 @@ public class GeodeSourceConnectorConfig extends 
GeodeConnectorConfig {
 
   protected static ConfigDef configurables() {
     ConfigDef configDef = GeodeConnectorConfig.configurables();
-    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", 
ConfigDef.Importance.HIGH, "Internally created and used parameter, for 
signalling a task to register cqs");
-    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING, 
DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH, "");
-    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, 
DEFAULT_DURABLE_CLIENT_ID, ConfigDef.Importance.LOW, "");
-    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, 
DEFAULT_DURABLE_CLIENT_TIMEOUT, ConfigDef.Importance.LOW, "");
-    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, 
ConfigDef.Importance.LOW, "");
-    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE, 
ConfigDef.Importance.MEDIUM, "");
-    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE, 
ConfigDef.Importance.MEDIUM, "");
-    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, 
DEFAULT_LOAD_ENTIRE_REGION, ConfigDef.Importance.MEDIUM, "");
+    configDef.define(CQS_TO_REGISTER, ConfigDef.Type.STRING, "", 
ConfigDef.Importance.HIGH,
+        "Internally created and used parameter, for signalling a task to 
register cqs");
+    configDef.define(REGION_TO_TOPIC_BINDINGS, ConfigDef.Type.STRING,
+        DEFAULT_REGION_TO_TOPIC_BINDING, ConfigDef.Importance.HIGH,
+        "A comma separated list of \"one region to many topics\" mappings.  
Each mapping is surrounded by brackets.  For example \"[regionName:topicName], 
\"[anotherRegion: topicName, anotherTopic]\"");
+    configDef.define(DURABLE_CLIENT_ID_PREFIX, ConfigDef.Type.STRING, 
DEFAULT_DURABLE_CLIENT_ID,
+        ConfigDef.Importance.LOW,
+        "Prefix string for tasks to append to when registering as a durable 
client.  If empty string, will not register as a durable client");
+    configDef.define(DURABLE_CLIENT_TIME_OUT, ConfigDef.Type.STRING, 
DEFAULT_DURABLE_CLIENT_TIMEOUT,
+        ConfigDef.Importance.LOW,
+        "How long in milliseconds to persist values in Geode's durable queue 
before the queue is invalidated");
+    configDef.define(CQ_PREFIX, ConfigDef.Type.STRING, DEFAULT_CQ_PREFIX, 
ConfigDef.Importance.LOW,
+        "Prefix string to identify Connector cq's on a Geode server");
+    configDef.define(BATCH_SIZE, ConfigDef.Type.INT, DEFAULT_BATCH_SIZE,
+        ConfigDef.Importance.MEDIUM, "Maximum number of records to return on 
each poll");
+    configDef.define(QUEUE_SIZE, ConfigDef.Type.INT, DEFAULT_QUEUE_SIZE,
+        ConfigDef.Importance.MEDIUM,
+        "Maximum number of entries in the connector queue before backing up 
all Geode cq listeners sharing the task queue ");
+    configDef.define(LOAD_ENTIRE_REGION, ConfigDef.Type.BOOLEAN, 
DEFAULT_LOAD_ENTIRE_REGION,
+        ConfigDef.Importance.MEDIUM,
+        "Determines if we should queue up all entries that currently exist in 
a region.  This allows us to copy existing region data.  Will be replayed 
whenever a task needs to re-register a cq");
     return configDef;
   }
 
diff --git a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java 
b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
index 11a00f9..5c63d98 100644
--- a/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/geode/kafka/GeodeConnectorConfigTest.java
@@ -138,7 +138,8 @@ public class GeodeConnectorConfigTest {
   public void usesSecurityShouldBeTrueIfSecurityUserSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_USER, "some user");
-    GeodeConnectorConfig config = new 
GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
   }
 
@@ -146,14 +147,16 @@ public class GeodeConnectorConfigTest {
   public void usesSecurityShouldBeTrueIfSecurityClientAuthInitSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_CLIENT_AUTH_INIT, "someclass");
-    GeodeConnectorConfig config = new 
GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertTrue(config.usesSecurity());
   }
 
   @Test
   public void 
usesSecurityShouldBeFalseIfSecurityUserAndSecurityClientAuthInitNotSet() {
     Map<String, String> props = new HashMap<>();
-    GeodeConnectorConfig config = new 
GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertFalse(config.usesSecurity());
   }
 
@@ -161,14 +164,16 @@ public class GeodeConnectorConfigTest {
   public void securityClientAuthInitShouldBeSetIfUserIsSet() {
     Map<String, String> props = new HashMap<>();
     props.put(SECURITY_USER, "some user");
-    GeodeConnectorConfig config = new 
GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNotNull(config.getSecurityClientAuthInit());
   }
 
   @Test
   public void 
securityClientAuthInitShouldNotBeSetIfUserIsNotSetAndNotSpecificallySet() {
     Map<String, String> props = new HashMap<>();
-    GeodeConnectorConfig config = new 
GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
+    GeodeConnectorConfig config =
+        new GeodeConnectorConfig(GeodeConnectorConfig.configurables(), props);
     assertNull(config.getSecurityClientAuthInit());
   }
 
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java 
b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index b4a7bbe..3afcde7 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.geode.kafka.sink.GeodeKafkaSink;
-import org.geode.kafka.source.GeodeKafkaSource;
 import org.apache.kafka.common.utils.SystemTime;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -34,6 +32,8 @@ import 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
+import org.geode.kafka.sink.GeodeKafkaSink;
+import org.geode.kafka.source.GeodeKafkaSource;
 
 public class WorkerAndHerderWrapper {
 
diff --git a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java 
b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 7901426..4fa7d81 100644
--- a/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -30,26 +30,20 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.geode.cache.query.CqAttributes;
-import org.apache.geode.cache.query.CqResults;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Struct;
-import org.apache.geode.cache.query.internal.LinkedStructSet;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.ResultsBag;
-import org.apache.geode.cache.query.internal.StructImpl;
-import org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.geode.kafka.GeodeContext;
 import org.junit.Test;
 
 import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.internal.ResultsBag;
 
 
 public class GeodeKafkaSourceTaskTest {
@@ -149,7 +143,8 @@ public class GeodeKafkaSourceTaskTest {
 
     GeodeContext geodeContext = mock(GeodeContext.class);
     when(geodeContext.getClientCache()).thenReturn(clientCache);
-    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), 
any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag());
+    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), 
any(CqAttributes.class),
+        anyBoolean())).thenReturn(new ResultsBag());
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
     regionToTopicsMap.put("region1", new ArrayList());
 

Reply via email to