This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new d2b2f54 Code improvements: d2b2f54 is described below commit d2b2f54cb77eed373d08e1ba88036f0847528ff5 Author: Nabarun Nag <n...@cs.wisc.edu> AuthorDate: Fri Apr 3 11:35:34 2020 -0700 Code improvements: * Exceptions being thrown as ConnectException * Return values are used. * static analyzer recommendations are implemented. --- .../geode/kafka/converter/JsonPdxConverter.java | 2 +- .../org/apache/geode/kafka/sink/BatchRecords.java | 8 +++---- .../geode/kafka/sink/GeodeKafkaSinkTask.java | 18 +++++++++++---- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 6 +---- .../geode/kafka/source/GeodeKafkaSourceTask.java | 27 ++++++++++++++-------- .../apache/geode/kafka/utils/EnumValidator.java | 2 +- .../kafka-connect-geode-version.properties | 2 +- .../kafka/converter/JsonPdxConverterDUnitTest.java | 6 ++--- .../kafka/source/GeodeKafkaSourceTaskTest.java | 5 ---- 9 files changed, 41 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java index e630b4e..03a6f0a 100644 --- a/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java +++ b/src/main/java/org/apache/geode/kafka/converter/JsonPdxConverter.java @@ -28,7 +28,7 @@ public class JsonPdxConverter implements Converter { public static final String JSON_TYPE_ANNOTATION = "\"@type\""; // Default value = false public static final String ADD_TYPE_ANNOTATION_TO_JSON = "add-type-annotation-to-json"; - private Map<String, String> internalConfig = new HashMap<>(); + final private Map<String, String> internalConfig = new HashMap<>(); @Override public void configure(Map<String, ?> configs, boolean isKey) { diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java index 7974abd..909cd7c 100644 --- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java +++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java @@ -44,8 +44,8 @@ public class BatchRecords { public void addRemoveOperation(SinkRecord record) { // if a previous operation added to the update map - // let's just remove it so we don't do a put and then a remove - // depending on the order of operations (putAll then removeAll or removeAll or putAll)... + // let's just remove it so, we don't do a put and then a remove + // depending on the order of operations (putAll then removeAll or putAll)... // ...we could remove one of the if statements. if (updateMap.containsKey(record.key())) { updateMap.remove(record.key()); @@ -56,7 +56,7 @@ public class BatchRecords { public void addUpdateOperation(SinkRecord record, boolean nullValuesMeansRemove) { // it's assumed the records in are order - // if so if a previous value was in the remove list + // if so then a previous value was in the remove list // let's not remove it at the end of this operation if (nullValuesMeansRemove) { removeList.remove(record.key()); @@ -70,7 +70,7 @@ public class BatchRecords { region.putAll(updateMap); region.removeAll(removeList); } else { - logger.info("Unable to locate proxy region is null"); + logger.info("Unable to locate a proxy region. Value is null"); } } } diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java index 1f50ea4..873d89b 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionExistsException; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.kafka.GeodeContext; import org.apache.geode.kafka.Version; @@ -61,13 +62,20 @@ public class GeodeKafkaSinkTask extends SinkTask { GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props); configure(geodeConnectorConfig); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), - geodeConnectorConfig.getSecurityClientAuthInit(), - geodeConnectorConfig.getSecurityUserName(), - geodeConnectorConfig.getSecurityPassword(), - geodeConnectorConfig.usesSecurity()); + ClientCache clientCache = + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), + geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); + if (clientCache == null) { + throw new ConnectException("Unable start client cache in the sink task"); + } regionNameToRegion = createProxyRegions(topicToRegions.values()); } catch (Exception e) { + if (e instanceof ConnectException) { + throw e; + } throw new ConnectException("Unable to start sink task", e); } } diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java index 2ace395..7a60513 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -79,11 +79,7 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { } public boolean getNullValueBehavior() { - if (nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE)) { - return true; - } else { - return false; - } + return nullValueBehavior.equals(GeodeSinkConfigurationConstants.NullValueBehavior.REMOVE); } } diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java index 13e5b60..4ce7371 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqAttributesFactory; import org.apache.geode.cache.query.CqQuery; @@ -72,14 +73,17 @@ public class GeodeKafkaSourceTask extends SourceTask { GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), - geodeConnectorConfig.getDurableClientId(), - geodeConnectorConfig.getDurableClientTimeout(), - geodeConnectorConfig.getSecurityClientAuthInit(), - geodeConnectorConfig.getSecurityUserName(), - geodeConnectorConfig.getSecurityPassword(), - geodeConnectorConfig.usesSecurity()); - + ClientCache clientCache = + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), + geodeConnectorConfig.getDurableClientId(), + geodeConnectorConfig.getDurableClientTimeout(), + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), + geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); + if (clientCache == null) { + throw new ConnectException("Unable to create client cache in the source task"); + } batchSize = geodeConnectorConfig.getBatchSize(); eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize()); @@ -92,9 +96,12 @@ public class GeodeKafkaSourceTask extends SourceTask { loadEntireRegion); logger.info("Started Apache Geode source task"); } catch (Exception e) { - e.printStackTrace(); logger.error("Unable to start source task", e); - throw e; + if (e instanceof ConnectException) { + throw e; + } else { + throw new ConnectException(e); + } } } diff --git a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java index 1c9e1ff..f7c2a61 100644 --- a/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java +++ b/src/main/java/org/apache/geode/kafka/utils/EnumValidator.java @@ -28,7 +28,7 @@ public class EnumValidator implements ConfigDef.Validator { } public static <T> EnumValidator in(T[] enumerators) { - Set<String> validValues = new HashSet<String>(enumerators.length); + Set<String> validValues = new HashSet<>(enumerators.length); for (T e : enumerators) { validValues.add(e.toString().toLowerCase()); } diff --git a/src/main/resources/kafka-connect-geode-version.properties b/src/main/resources/kafka-connect-geode-version.properties index 5ea51b3..5efe2ea 100644 --- a/src/main/resources/kafka-connect-geode-version.properties +++ b/src/main/resources/kafka-connect-geode-version.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=${project.version} \ No newline at end of file +version=${project.version} \ No newline at end of file diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java index 46470c1..dfb990b 100644 --- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java @@ -57,16 +57,16 @@ import org.apache.geode.test.dunit.rules.MemberVM; public class JsonPdxConverterDUnitTest { @Rule - public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + final public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); @Rule public TestName testName = new TestName(); @ClassRule - public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); + final public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); @Rule - public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); + final public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); @BeforeClass public static void setup() diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 411316c..92c78e2 100644 --- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -71,11 +71,6 @@ public class GeodeKafkaSourceTaskTest { public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() { GeodeContext geodeContext = mock(GeodeContext.class); BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100); - CqResults<Object> fakeInitialResults = new ResultsBag(); - for (int i = 0; i < 10; i++) { - fakeInitialResults.add(mock(CqEvent.class)); - } - when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean())) .thenReturn(mock(CqQuery.class)); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();