This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5189d2d  Changing the project to a maven project
5189d2d is described below

commit 5189d2d91d8b22745ac7333c9cfa9185b513dab1
Author: Naburun Nag <n...@cs.wisc.edu>
AuthorDate: Tue Feb 25 11:01:00 2020 -0800

    Changing the project to a maven project
---
 .gitignore                                         | 192 ++++----------
 NOTICE                                             |   6 +
 logos/APACHE-20th-logo.png                         | Bin 0 -> 466136 bytes
 logos/Apache_Geode_logo_symbol.png                 | Bin 0 -> 13871 bytes
 logos/PoweredBYlogo.png                            | Bin 0 -> 84350 bytes
 pom.xml                                            | 293 +++++++++++++++++++++
 src/assembly/development.xml                       |  23 ++
 src/assembly/package.xml                           |  33 +++
 src/assembly/standalone.xml                        |  33 +++
 .../apache/geode/kafka/GeodeConnectorConfig.java   |  17 +-
 .../java/org/apache/geode/kafka/GeodeContext.java  |  17 +-
 .../org/apache/geode/kafka/LocatorHostPort.java    |   4 +-
 .../kafka/security/SystemPropertyAuthInit.java     |   2 +-
 .../org/apache/geode/kafka/sink/BatchRecords.java  |  12 +-
 .../geode/kafka/sink/GeodeKafkaSinkTask.java       |  43 +--
 .../geode/kafka/sink/GeodeSinkConnectorConfig.java |   2 +-
 .../org/apache/geode/kafka/source/GeodeEvent.java  |   6 +-
 .../geode/kafka/source/GeodeKafkaSource.java       |   4 -
 .../kafka/source/GeodeKafkaSourceListener.java     |   4 +-
 .../geode/kafka/source/GeodeKafkaSourceTask.java   |  38 ++-
 .../kafka/source/GeodeSourceConnectorConfig.java   |   4 +-
 .../kafka/source/SharedEventBufferSupplier.java    |   3 +-
 .../apache/geode/kafka/GeodeAsSinkDUnitTest.java   |  12 +-
 .../apache/geode/kafka/GeodeAsSourceDUnitTest.java |  10 +-
 .../geode/kafka/GeodeConnectorConfigTest.java      |  11 +-
 .../kafka/converter/JsonPdxConverterDUnitTest.java |   4 +-
 .../apache/geode/kafka/sink/BatchRecordsTest.java  |  27 +-
 .../geode/kafka/sink/GeodeKafkaSinkTaskTest.java   |  20 +-
 .../geode/kafka/sink/GeodeKafkaSinkTest.java       |   8 +-
 .../kafka/source/GeodeKafkaSourceTaskTest.java     |  83 ++----
 .../geode/kafka/source/GeodeKafkaSourceTest.java   |   8 +-
 .../geode/kafka/utilities/GeodeKafkaTestUtils.java |  10 +-
 .../apache/geode/kafka/utilities/JavaProcess.java  |  10 +-
 .../geode/kafka/utilities/KafkaLocalCluster.java   |   5 +-
 .../kafka/utilities/WorkerAndHerderCluster.java    |  12 +-
 .../kafka/utilities/WorkerAndHerderWrapper.java    |   5 +-
 .../kafka/utilities/ZooKeeperLocalCluster.java     |  19 +-
 37 files changed, 613 insertions(+), 367 deletions(-)

diff --git a/.gitignore b/.gitignore
index 2e856f5..89a3ea2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,13 +30,13 @@
 # When using Gradle or Maven with auto-import, you should exclude module files,
 # since they will be recreated, and may cause churn.  Uncomment if using
 # auto-import.
-# .idea/artifacts
-# .idea/compiler.xml
-# .idea/modules.xml
-# .idea/*.iml
-# .idea/modules
-# *.iml
-# *.ipr
+ .idea/artifacts
+ .idea/compiler.xml
+ .idea/modules.xml
+ .idea/*.iml
+ .idea/modules
+ *.iml
+ *.ipr
 
 # CMake
 cmake-build-*/
@@ -71,105 +71,46 @@ fabric.properties
 # Android studio 3.1+ serialized cache file
 .idea/caches/build_file_checksums.ser
 
-### Gradle template
-.gradle
-/build/
+### Maven template
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+# https://github.com/takari/maven-wrapper#usage-without-binary-jar
+.mvn/wrapper/maven-wrapper.jar
 
-# Ignore Gradle GUI config
-gradle-app.setting
-
-# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored)
-!gradle-wrapper.jar
-
-# Cache of project
-.gradletasknamecache
-
-# # Work around https://youtrack.jetbrains.com/issue/IDEA-116898
-# gradle/wrapper/gradle-wrapper.properties
-
-### Eclipse template
-.metadata
-bin/
-tmp/
-*.tmp
-*.bak
-*.swp
-*~.nib
-local.properties
-.settings/
-.loadpath
-.recommenders
-
-# External tool builders
-.externalToolBuilders/
-
-# Locally stored "Eclipse launch configurations"
-*.launch
-
-# PyDev specific (Python IDE for Eclipse)
-*.pydevproject
-
-# CDT-specific (C/C++ Development Tooling)
-.cproject
-
-# CDT- autotools
-.autotools
-
-# Java annotation processor (APT)
-.factorypath
-
-# PDT-specific (PHP Development Tools)
-.buildpath
-
-# sbteclipse plugin
-.target
-
-# Tern plugin
-.tern-project
-
-# TeXlipse plugin
-.texlipse
-
-# STS (Spring Tool Suite)
-.springBeans
-
-# Code Recommenders
-.recommenders/
-
-# Annotation Processing
-.apt_generated/
-.apt_generated_test/
-
-# Scala IDE specific (Scala & Java development for Eclipse)
-.cache-main
-.scala_dependencies
-.worksheet
-
-### Windows template
-# Windows thumbnail cache files
-Thumbs.db
-Thumbs.db:encryptable
-ehthumbs.db
-ehthumbs_vista.db
-
-# Dump file
-*.stackdump
+### macOS template
+# General
+.DS_Store
+.AppleDouble
+.LSOverride
 
-# Folder config file
-[Dd]esktop.ini
+# Icon must end with two \r
+Icon
 
-# Recycle Bin used on file shares
-$RECYCLE.BIN/
+# Thumbnails
+._*
 
-# Windows Installer files
-*.cab
-*.msi
-*.msix
-*.msm
-*.msp
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+.com.apple.timemachine.donotpresent
 
-# Windows shortcuts
-*.lnk
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
 
 ### Java template
 # Compiled class file
@@ -196,43 +137,16 @@ $RECYCLE.BIN/
 # virtual machine crash logs, see 
http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
 
-### macOS template
-# General
-.DS_Store
-.AppleDouble
-.LSOverride
-
-# Icon must end with two \r
-Icon
-
-# Thumbnails
-._*
+.idea
+.idea/**
+.dunit/
+.dunit/**
+.dunit/*
 
-# Files that might appear in the root of a volume
-.DocumentRevisions-V100
-.fseventsd
-.Spotlight-V100
-.TemporaryItems
-.Trashes
-.VolumeIcon.icns
-.com.apple.timemachine.donotpresent
-
-# Directories potentially created on remote AFP share
-.AppleDB
-.AppleDesktop
-Network Trash Folder
-Temporary Items
-.apdisk
 
-### NetBeans template
-**/nbproject/private/
-**/nbproject/Makefile-*.mk
-**/nbproject/Package-*.bash
-build/
-nbbuild/
-dist/
-nbdist/
-.nb-gradle/
-
-.idea/
-**/dunit
+dunit/locator/locator*
+dunit/vm0/
+dunit/vm1/
+dunit/vm2/
+dunit/vm3/
+kafka-connect-geode.iml
\ No newline at end of file
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..5c832f9
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,6 @@
+Apache Geode
+Copyright 2016-2020 The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
diff --git a/logos/APACHE-20th-logo.png b/logos/APACHE-20th-logo.png
new file mode 100644
index 0000000..524eb4d
Binary files /dev/null and b/logos/APACHE-20th-logo.png differ
diff --git a/logos/Apache_Geode_logo_symbol.png 
b/logos/Apache_Geode_logo_symbol.png
new file mode 100644
index 0000000..88f3996
Binary files /dev/null and b/logos/Apache_Geode_logo_symbol.png differ
diff --git a/logos/PoweredBYlogo.png b/logos/PoweredBYlogo.png
new file mode 100644
index 0000000..333a00b
Binary files /dev/null and b/logos/PoweredBYlogo.png differ
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7a423ee
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,293 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache</groupId>
+    <artifactId>kafka-connect-geode</artifactId>
+    <packaging>jar</packaging>
+    <name>kafka-connect-geode</name>
+    <organization>
+        <name>Apache Software Foundation</name>
+        <url>https://geode.apache.org/</url>
+    </organization>
+    <url>https://geode.apache.org/</url>
+    <description>
+        Apache Geode Sink and Source connector for Kafka Connect
+    </description>
+    <licenses>
+        <license>
+            <name>Apache License 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        
<connection>scm:git:git://https://github.com/apache/geode-kafka-connector.git</connection>
+        
<developerConnection>scm:git:g...@github.com:apache/geode-kafka-connector.git</developerConnection>
+        <url>https://github.com/apache/geode-kafka-connector</url>
+        <tag>HEAD</tag>
+    </scm>
+
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <geode.core.version>1.9.0</geode.core.version>
+        <geode.cq.version>1.9.0</geode.cq.version>
+        <geode.dunit.version>1.9.0</geode.dunit.version>
+        <kafka.connect-api.version>2.3.1</kafka.connect-api.version>
+        <log4j.version>2.13.0</log4j.version>
+        <kafka_2.12.version>2.3.1</kafka_2.12.version>
+        <curator-framework.version>4.2.0</curator-framework.version>
+        
<kafka-streams-test-utils.version>1.1.0</kafka-streams-test-utils.version>
+        <kafka.connect-runtime.version>2.3.1</kafka.connect-runtime.version>
+        <junit.version>4.12</junit.version>
+        <mockito.version>3.2.4</mockito.version>
+        <JUnitParams.version>1.1.1</JUnitParams.version>
+        <awaitility.version>3.1.6</awaitility.version>
+        <maven-plugin.version>3.8.1</maven-plugin.version>
+        <zookeeper.version>3.5.7</zookeeper.version>
+        
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>confluent</id>
+            <name>Confluent</name>
+            <url>${confluent.maven.repo}</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api 
-->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${kafka.connect-api.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.geode/geode-core -->
+        <dependency>
+            <groupId>org.apache.geode</groupId>
+            <artifactId>geode-core</artifactId>
+            <version>${geode.core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geode</groupId>
+            <artifactId>geode-cq</artifactId>
+            <version>${geode.cq.version}</version>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>${zookeeper.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.12</artifactId>
+            <version>${kafka_2.12.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams-test-utils</artifactId>
+            <version>${kafka-streams-test-utils.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator-framework.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- 
https://mvnrepository.com/artifact/org.apache.kafka/connect-runtime -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>${kafka.connect-runtime.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/junit/junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/pl.pragmatists/JUnitParams -->
+        <dependency>
+            <groupId>pl.pragmatists</groupId>
+            <artifactId>JUnitParams</artifactId>
+            <version>${JUnitParams.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.geode/geode-dunit 
-->
+        <dependency>
+            <groupId>org.apache.geode</groupId>
+            <artifactId>geode-dunit</artifactId>
+            <version>${geode.dunit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.awaitility/awaitility -->
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>io.confluent</groupId>
+                <version>0.10.0</version>
+                <artifactId>kafka-connect-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>kafka-connect</goal>
+                        </goals>
+                        <configuration>
+                            <title>Kafka Connect Apache Geode</title>
+                            
<documentationUrl>https://geode.apache.org/docs/</documentationUrl>
+                            <description>
+                            The Apache Geode connector can be used to move 
data from Kakfa to Geode and vice versa. The Sink take data from a Kafka topic 
and puts in a region in Geode, while the Source will move any data inserted 
into Geode region, to Kafka topics .
+
+                                Apache Geode is an in-memory data grid which 
stores data in a key-value format. When the Geode acts as a Sink, the key value 
pair is extracted from the Sink Record from the Kafka topic and that key-value 
pair is stored in Geode regions. When Geode acts as a Source, whenever a 
key-value pair is inserted into the region, an event is sent to connector 
containing the data. This data is then placed into the Kafka topic.
+
+                                The mapping between the region and topics must 
be provided by the user. They must also ensure that the Apache Geode cluster is 
up and running along with all the required regions along with the topics in 
Kafka. The Apache Geode cluster must consist of at least one locator and one 
server.
+                            </description>
+                            <logo>logos/elasticsearch.jpg</logo>
+
+                            <supportProviderName>Apache Software Foundation / 
VMware</supportProviderName>
+                            <supportSummary>VMware, along with Apache Geode 
community members support the Apache Geode Connector </supportSummary>
+                            
<supportUrl>https://geode.apache.org/docs/</supportUrl>
+                            <supportLogo>logos/PoweredBYlogo.png</supportLogo>
+
+                            <ownerUsername>apache</ownerUsername>
+                            <ownerType>organization</ownerType>
+                            <ownerName>Apache Software Foundation.</ownerName>
+                            <ownerUrl>https://www.apache.org/</ownerUrl>
+                            <ownerLogo>logos/APACHE-20th-logo.png</ownerLogo>
+
+                            <componentTypes>
+                                <componentType>sink</componentType>
+                                <componentType>source</componentType>
+                            </componentTypes>
+
+                            <tags>
+                                <tag>Apache</tag>
+                                <tag>Geode</tag>
+                                <tag>in-memory</tag>
+                                <tag>datagrid</tag>
+                                <tag>key-value</tag>
+                                <tag>GemFire</tag>
+                            </tags>
+
+                            <requirements>
+                                <requirement>Apache Geode 1.9 or 
above</requirement>
+                            </requirements>
+
+                            <deliveryGuarantee>
+                                
<deliveryGuarantee>atLeastOnce</deliveryGuarantee>
+                            </deliveryGuarantee>
+
+                            
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-plugin.version}</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-plugin.version}</version>
+                <inherited>true</inherited>
+                <configuration>
+                    <compilerArgs>
+                        <arg>-Xlint:-processing</arg>
+                        <arg>-Xlint:all</arg>
+                        <arg>-Werror</arg>
+                    </compilerArgs>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/assembly/development.xml</descriptor>
+                        <descriptor>src/assembly/package.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+    </build>
+    <profiles>
+        <profile>
+            <id>standalone</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <descriptors>
+                                
<descriptor>src/assembly/standalone.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
\ No newline at end of file
diff --git a/src/assembly/development.xml b/src/assembly/development.xml
new file mode 100644
index 0000000..ae37239
--- /dev/null
+++ b/src/assembly/development.xml
@@ -0,0 +1,23 @@
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <!-- Assembles all dependencies in target/ directory so scripts can easily 
run in a development
+         environment -->
+    <id>development</id>
+    <formats>
+        <format>dir</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>share/java/kafka-connect-geode/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+            <excludes>
+                <!-- Exclude these jars during packaging.-->
+                <exclude>org.apache.kafka:connect-api</exclude>
+                <exclude>org.apache.kafka:connect-json</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/src/assembly/package.xml b/src/assembly/package.xml
new file mode 100644
index 0000000..9704986
--- /dev/null
+++ b/src/assembly/package.xml
@@ -0,0 +1,33 @@
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <!-- Assembles a packaged version targeting OS installation. -->
+    <id>package</id>
+    <formats>
+        <format>dir</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>${project.basedir}</directory>
+            <outputDirectory>share/doc/kafka-connect-geode/</outputDirectory>
+            <includes>
+                <include>README*</include>
+                <include>LICENSE*</include>
+                <include>NOTICE*</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>share/java/kafka-connect-geode</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+            <excludes>
+                <!-- Exclude these jars during packaging.-->
+                <exclude>org.apache.kafka:connect-api</exclude>
+                <exclude>org.apache.kafka:connect-json</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/src/assembly/standalone.xml b/src/assembly/standalone.xml
new file mode 100644
index 0000000..9cf29e3
--- /dev/null
+++ b/src/assembly/standalone.xml
@@ -0,0 +1,33 @@
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <!-- Assembles a packaged jar that includes all dependencies. This still 
requires the kafka-connect
+     runtime, but allows running kafka-connect with a connector plugin using 
only a couple of jars. -->
+    <id>standalone</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>${project.basedir}</directory>
+            <outputDirectory>/</outputDirectory>
+            <includes>
+                <include>README*</include>
+                <include>LICENSE*</include>
+                <include>NOTICE*</include>
+                <include>licenses.html</include>
+                <include>licenses/</include>
+                <include>notices/</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java 
b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
index 846b1e9..717fef6 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java
@@ -54,7 +54,7 @@ public class GeodeConnectorConfig extends AbstractConfig {
 
   // Just for testing
   protected GeodeConnectorConfig() {
-    super(new ConfigDef(), new HashMap());
+    super(new ConfigDef(), new HashMap<>());
     taskId = 0;
   }
 
@@ -104,14 +104,13 @@ public class GeodeConnectorConfig extends AbstractConfig {
    */
   public static Map<String, List<String>> parseRegionToTopics(String 
combinedBindings) {
     if (combinedBindings == null || combinedBindings.equals("")) {
-      return new HashMap();
+      return new HashMap<>();
     }
     List<String> bindings = parseBindings(combinedBindings);
-    return bindings.stream().map(binding -> {
-      String[] regionToTopicsArray = parseBinding(binding);
-      return regionToTopicsArray;
-    }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0],
-        regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1])));
+    return bindings.stream().map(
+        GeodeConnectorConfig::parseBinding)
+        .collect(Collectors.toMap(regionToTopicsArray -> 
regionToTopicsArray[0],
+            regionToTopicsArray -> 
parseStringByComma(regionToTopicsArray[1])));
   }
 
   public static List<String> parseBindings(String bindings) {
@@ -133,11 +132,11 @@ public class GeodeConnectorConfig extends AbstractConfig {
   }
 
   public static List<String> parseStringBy(String string, String regex) {
-    return Arrays.stream(string.split(regex)).map((s) -> 
s.trim()).collect(Collectors.toList());
+    return 
Arrays.stream(string.split(regex)).map(String::trim).collect(Collectors.toList());
   }
 
   public static String reconstructString(Collection<String> strings) {
-    return strings.stream().collect(Collectors.joining("],["));
+    return String.join("],[", strings);
   }
 
   List<LocatorHostPort> parseLocators(String locators) {
diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java 
b/src/main/java/org/apache/geode/kafka/GeodeContext.java
index 99ef11c..6be418f 100644
--- a/src/main/java/org/apache/geode/kafka/GeodeContext.java
+++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java
@@ -39,16 +39,16 @@ public class GeodeContext {
   public GeodeContext() {}
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String durableClientId, String durableClientTimeout, String 
securityAuthInit,
-      String securityUserName, String securityPassword, boolean usesSecurity) {
+                                   String durableClientId, String 
durableClientTimeout, String securityAuthInit,
+                                   String securityUserName, String 
securityPassword, boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, durableClientId, 
durableClientTimeout,
         securityAuthInit, securityUserName, securityPassword, usesSecurity);
     return clientCache;
   }
 
   public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList,
-      String securityAuthInit, String securityUserName, String 
securityPassword,
-      boolean usesSecurity) {
+                                   String securityAuthInit, String 
securityUserName, String securityPassword,
+                                   boolean usesSecurity) {
     clientCache = createClientCache(locatorHostPortList, "", "", 
securityAuthInit, securityUserName,
         securityPassword, usesSecurity);
     return clientCache;
@@ -59,8 +59,8 @@ public class GeodeContext {
   }
 
   public ClientCache createClientCache(List<LocatorHostPort> locators, String 
durableClientName,
-      String durableClientTimeOut, String securityAuthInit, String 
securityUserName,
-      String securityPassword, boolean usesSecurity) {
+                                       String durableClientTimeOut, String 
securityAuthInit, String securityUserName,
+                                       String securityPassword, boolean 
usesSecurity) {
     ClientCacheFactory ccf = new ClientCacheFactory();
 
     ccf.setPdxReadSerialized(true);
@@ -96,8 +96,9 @@ public class GeodeContext {
     }
   }
 
-  public CqResults newCqWithInitialResults(String name, String query, 
CqAttributes cqAttributes,
-      boolean isDurable) throws ConnectException {
+  public <E> CqResults<E> newCqWithInitialResults(String name, String query,
+                                                  CqAttributes cqAttributes,
+                                                  boolean isDurable) throws 
ConnectException {
     try {
       CqQuery cq = clientCache.getQueryService().newCq(name, query, 
cqAttributes, isDurable);
       return cq.executeWithInitialResults();
diff --git a/src/main/java/org/apache/geode/kafka/LocatorHostPort.java 
b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
index d879d8e..124f76d 100644
--- a/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
+++ b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java
@@ -16,8 +16,8 @@ package org.apache.geode.kafka;
 
 public class LocatorHostPort {
 
-  private String hostName;
-  private int port;
+  private final String hostName;
+  private final int port;
 
   public LocatorHostPort(String hostName, int port) {
     this.hostName = hostName;
diff --git 
a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java 
b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
index 4f3e414..117b28e 100644
--- a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
+++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java
@@ -24,7 +24,7 @@ import 
org.apache.geode.security.AuthenticationFailedException;
 public class SystemPropertyAuthInit implements AuthInitialize {
   @Override
   public Properties getCredentials(Properties securityProps, DistributedMember 
server,
-      boolean isPeer) throws AuthenticationFailedException {
+                                   boolean isPeer) throws 
AuthenticationFailedException {
     Properties extractedProperties = new Properties();
     extractedProperties.put("security-username", 
securityProps.get("security-username"));
     extractedProperties.put("security-password", 
securityProps.get("security-password"));
diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java 
b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
index 9d64a03..7974abd 100644
--- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
+++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java
@@ -15,9 +15,7 @@
 package org.apache.geode.kafka.sink;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
@@ -31,15 +29,15 @@ import org.apache.geode.cache.Region;
 public class BatchRecords {
   private static final Logger logger = 
LoggerFactory.getLogger(BatchRecords.class);
 
-  private Map updateMap;
-  private Collection removeList;
+  private final HashMap<Object, Object> updateMap;
+  private final ArrayList<Object> removeList;
 
   public BatchRecords() {
-    this(new HashMap(), new ArrayList());
+    this(new HashMap<>(), new ArrayList<>());
   }
 
   /** Used for tests **/
-  public BatchRecords(Map updateMap, Collection removeList) {
+  public BatchRecords(HashMap<Object, Object> updateMap, ArrayList<Object> 
removeList) {
     this.updateMap = updateMap;
     this.removeList = removeList;
   }
@@ -67,7 +65,7 @@ public class BatchRecords {
   }
 
 
-  public void executeOperations(Region region) {
+  public void executeOperations(Region<Object, Object> region) {
     if (region != null) {
       region.putAll(updateMap);
       region.removeAll(removeList);
diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java 
b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
index daf2274..1688eb7 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.slf4j.Logger;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionExistsException;
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.kafka.GeodeContext;
 
@@ -41,7 +43,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   private GeodeContext geodeContext;
   private Map<String, List<String>> topicToRegions;
-  private Map<String, Region> regionNameToRegion;
+  private Map<String, Region<Object, Object>> regionNameToRegion;
   private boolean nullValuesMeansRemove = true;
 
   /**
@@ -59,10 +61,16 @@ public class GeodeKafkaSinkTask extends SinkTask {
       GeodeSinkConnectorConfig geodeConnectorConfig = new 
GeodeSinkConnectorConfig(props);
       configure(geodeConnectorConfig);
       geodeContext = new GeodeContext();
-      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getSecurityClientAuthInit(),
-          geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(),
-          geodeConnectorConfig.usesSecurity());
+      final ClientCache clientCache =
+          
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+              geodeConnectorConfig.getSecurityClientAuthInit(),
+              geodeConnectorConfig.getSecurityUserName(),
+              geodeConnectorConfig.getSecurityPassword(),
+              geodeConnectorConfig.usesSecurity());
+      if (clientCache == null) {
+        throw new ConnectException(
+            "Unable to create a client cache connected to the Apache Geode 
cluster");
+      }
       regionNameToRegion = createProxyRegions(topicToRegions.values());
     } catch (Exception e) {
       logger.error("Unable to start sink task", e);
@@ -72,19 +80,18 @@ public class GeodeKafkaSinkTask extends SinkTask {
 
   void configure(GeodeSinkConnectorConfig geodeConnectorConfig) {
     logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() 
+ " starting");
-    int taskId = geodeConnectorConfig.getTaskId();
     topicToRegions = geodeConnectorConfig.getTopicToRegions();
     nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove();
   }
 
   // For tests only
-  void setRegionNameToRegion(Map<String, Region> regionNameToRegion) {
+  void setRegionNameToRegion(Map<String, Region<Object, Object>> 
regionNameToRegion) {
     this.regionNameToRegion = regionNameToRegion;
   }
 
   @Override
   public void put(Collection<SinkRecord> records) {
-    put(records, new HashMap());
+    put(records, new HashMap<>());
   }
 
   void put(Collection<SinkRecord> records, Map<String, BatchRecords> 
batchRecordsMap) {
@@ -92,15 +99,12 @@ public class GeodeKafkaSinkTask extends SinkTask {
     for (SinkRecord record : records) {
       updateBatchForRegionByTopic(record, batchRecordsMap);
     }
-    batchRecordsMap.entrySet().stream().forEach((entry) -> {
-      String region = entry.getKey();
-      BatchRecords batchRecords = entry.getValue();
-      batchRecords.executeOperations(regionNameToRegion.get(region));
-    });
+    batchRecordsMap.forEach(
+        (region, batchRecords) -> 
batchRecords.executeOperations(regionNameToRegion.get(region)));
   }
 
   private void updateBatchForRegionByTopic(SinkRecord sinkRecord,
-      Map<String, BatchRecords> batchRecordsMap) {
+                                           Map<String, BatchRecords> 
batchRecordsMap) {
     Collection<String> regionsToUpdate = 
topicToRegions.get(sinkRecord.topic());
     for (String region : regionsToUpdate) {
       updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region);
@@ -108,7 +112,7 @@ public class GeodeKafkaSinkTask extends SinkTask {
   }
 
   private void updateBatchRecordsForRecord(SinkRecord record,
-      Map<String, BatchRecords> batchRecordsMap, String region) {
+                                           Map<String, BatchRecords> 
batchRecordsMap, String region) {
     BatchRecords batchRecords = batchRecordsMap.get(region);
     if (batchRecords == null) {
       batchRecords = new BatchRecords();
@@ -126,13 +130,14 @@ public class GeodeKafkaSinkTask extends SinkTask {
     }
   }
 
-  private Map<String, Region> createProxyRegions(Collection<List<String>> 
regionNames) {
+  private Map<String, Region<Object, Object>> createProxyRegions(
+      Collection<List<String>> regionNames) {
     List<String> flat = 
regionNames.stream().flatMap(List::stream).collect(Collectors.toList());
-    return flat.stream().map(regionName -> createProxyRegion(regionName))
-        .collect(Collectors.toMap(region -> region.getName(), region -> 
region));
+    return flat.stream().map(this::createProxyRegion)
+        .collect(Collectors.toMap(Region::getName, region -> region));
   }
 
-  private Region createProxyRegion(String regionName) {
+  private Region<Object, Object> createProxyRegion(String regionName) {
     try {
       return 
geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
           .create(regionName);
diff --git 
a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java 
b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
index b12eb8f..cd49778 100644
--- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
+++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java
@@ -30,7 +30,7 @@ public class GeodeSinkConnectorConfig extends 
GeodeConnectorConfig {
   public static final String NULL_VALUES_MEAN_REMOVE = 
"null-values-mean-remove";
   public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true";
 
-  private Map<String, List<String>> topicToRegions;
+  private final Map<String, List<String>> topicToRegions;
   private final boolean nullValuesMeanRemove;
 
   public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) {
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
index 654b05a..b801356 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java
@@ -21,9 +21,9 @@ import org.apache.geode.cache.query.CqEvent;
  */
 public class GeodeEvent {
 
-  private String regionName;
-  private Object key;
-  private Object value;
+  private final String regionName;
+  private final Object key;
+  private final Object value;
 
   public GeodeEvent(String regionName, CqEvent event) {
     this(regionName, event.getKey(), event.getNewValue());
diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
index 89054a6..ca6dd0c 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java
@@ -79,8 +79,4 @@ public class GeodeKafkaSource extends SourceConnector {
     // TODO
     return AppInfoParser.getVersion();
   }
-
-  public Map<String, String> getSharedProps() {
-    return sharedProps;
-  }
 }
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
index 1d16404..8a54766 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java
@@ -26,8 +26,8 @@ class GeodeKafkaSourceListener implements CqStatusListener {
 
   private static final Logger logger = 
LoggerFactory.getLogger(GeodeKafkaSourceListener.class);
 
-  public String regionName;
-  private EventBufferSupplier eventBufferSupplier;
+  public final String regionName;
+  private final EventBufferSupplier eventBufferSupplier;
   private boolean initialResultsLoaded;
 
   public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, 
String regionName) {
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
index 8f26c1e..182efff 100644
--- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
+++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java
@@ -21,13 +21,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.Struct;
 import org.apache.geode.kafka.GeodeContext;
@@ -66,12 +69,18 @@ public class GeodeKafkaSourceTask extends SourceTask {
       GeodeSourceConnectorConfig geodeConnectorConfig = new 
GeodeSourceConnectorConfig(props);
       logger.debug("GeodeKafkaSourceTask id:" + 
geodeConnectorConfig.getTaskId() + " starting");
       geodeContext = new GeodeContext();
-      geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
-          geodeConnectorConfig.getDurableClientId(), 
geodeConnectorConfig.getDurableClientTimeout(),
-          geodeConnectorConfig.getSecurityClientAuthInit(),
-          geodeConnectorConfig.getSecurityUserName(), 
geodeConnectorConfig.getSecurityPassword(),
-          geodeConnectorConfig.usesSecurity());
-
+      final ClientCache clientCache =
+          
geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(),
+              geodeConnectorConfig.getDurableClientId(),
+              geodeConnectorConfig.getDurableClientTimeout(),
+              geodeConnectorConfig.getSecurityClientAuthInit(),
+              geodeConnectorConfig.getSecurityUserName(),
+              geodeConnectorConfig.getSecurityPassword(),
+              geodeConnectorConfig.usesSecurity());
+      if (clientCache == null) {
+        throw new ConnectException(
+            "Unable to create an client cache connected to Apache Geode 
cluster");
+      }
       batchSize = geodeConnectorConfig.getBatchSize();
       eventBufferSupplier = new 
SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize());
 
@@ -90,7 +99,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
   }
 
   @Override
-  public List<SourceRecord> poll() throws InterruptedException {
+  public List<SourceRecord> poll() {
     ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
     ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
     if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
@@ -114,7 +123,7 @@ public class GeodeKafkaSourceTask extends SourceTask {
   }
 
   void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, 
GeodeContext geodeContext,
-      EventBufferSupplier eventBuffer, String cqPrefix, boolean 
loadEntireRegion) {
+                      EventBufferSupplier eventBuffer, String cqPrefix, 
boolean loadEntireRegion) {
     boolean isDurable = geodeConnectorConfig.isDurable();
     int taskId = geodeConnectorConfig.getTaskId();
     for (String region : geodeConnectorConfig.getCqsToRegister()) {
@@ -127,26 +136,29 @@ public class GeodeKafkaSourceTask extends SourceTask {
   }
 
   GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, 
int taskId,
-      EventBufferSupplier eventBuffer, String regionName, String cqPrefix, 
boolean loadEntireRegion,
-      boolean isDurable) {
+                                                    EventBufferSupplier 
eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
+                                                    boolean isDurable) {
     CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
     GeodeKafkaSourceListener listener = new 
GeodeKafkaSourceListener(eventBuffer, regionName);
     cqAttributesFactory.addCqListener(listener);
     CqAttributes cqAttributes = cqAttributesFactory.create();
     try {
       if (loadEntireRegion) {
-        CqResults events =
+        CqResults<?> events =
             geodeContext.newCqWithInitialResults(generateCqName(taskId, 
cqPrefix, regionName),
                 "select * from /" + regionName, cqAttributes,
                 isDurable);
         eventBuffer.get()
-            .addAll((Collection<GeodeEvent>) events.stream().map(
+            .addAll(events.stream().map(
                 e -> new GeodeEvent(regionName, ((Struct) e).get("key"), 
((Struct) e).get("value")))
                 .collect(Collectors.toList()));
       } else {
-        geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
+        final CqQuery cqQuery = geodeContext.newCq(generateCqName(taskId, 
cqPrefix, regionName),
             "select * from /" + regionName, cqAttributes,
             isDurable);
+        if (cqQuery == null) {
+          throw new ConnectException("Unable to executed queries on the Apache 
Geode server");
+        }
       }
     } finally {
       listener.signalInitialResultsLoaded();
diff --git 
a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java 
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
index a004f23..01294c9 100644
--- 
a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
+++ 
b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java
@@ -60,8 +60,8 @@ public class GeodeSourceConnectorConfig extends 
GeodeConnectorConfig {
   private final int batchSize;
   private final int queueSize;
 
-  private Map<String, List<String>> regionToTopics;
-  private Collection<String> cqsToRegister;
+  private final Map<String, List<String>> regionToTopics;
+  private final Collection<String> cqsToRegister;
 
   public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) {
     super(SOURCE_CONFIG_DEF, connectorProperties);
diff --git 
a/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java 
b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
index b3d1268..dbbf902 100644
--- a/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
+++ b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java
@@ -25,7 +25,7 @@ public class SharedEventBufferSupplier implements 
EventBufferSupplier {
     recreateEventBufferIfNeeded(size);
   }
 
-  BlockingQueue recreateEventBufferIfNeeded(int size) {
+  void recreateEventBufferIfNeeded(int size) {
     if (eventBuffer == null || (eventBuffer.size() + 
eventBuffer.remainingCapacity()) != size) {
       synchronized (GeodeKafkaSource.class) {
         if (eventBuffer == null || (eventBuffer.size() + 
eventBuffer.remainingCapacity()) != size) {
@@ -37,7 +37,6 @@ public class SharedEventBufferSupplier implements 
EventBufferSupplier {
         }
       }
     }
-    return eventBuffer;
   }
 
   /**
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java 
b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
index 668681d..a2040ac 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -54,16 +54,16 @@ import org.apache.geode.test.dunit.rules.MemberVM;
 @RunWith(Parameterized.class)
 public class GeodeAsSinkDUnitTest {
   @Rule
-  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+  public final ClusterStartupRule clusterStartupRule = new 
ClusterStartupRule(3);
 
   @Rule
-  public TestName testName = new TestName();
+  public final TestName testName = new TestName();
 
   @ClassRule
-  public static TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
+  public static final TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
 
   @Rule
-  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+  public final TemporaryFolder temporaryFolderForOffset = new 
TemporaryFolder();
 
   @BeforeClass
   public static void setup()
@@ -149,11 +149,11 @@ public class GeodeAsSinkDUnitTest {
       Producer<String, String> producer = createProducer();
 
       for (int i = 0; i < NUM_EVENT; i++) {
-        producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
+        producer.send(new ProducerRecord<>(sinkTopic, "KEY" + i, "VALUE" + i));
       }
 
       client1.invoke(() -> {
-        Region region = 
ClusterStartupRule.getClientCache().getRegion(sinkRegion);
+        Region<Object, Object> region = 
ClusterStartupRule.getClientCache().getRegion(sinkRegion);
         await().atMost(10, TimeUnit.SECONDS)
             .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
       });
diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java 
b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
index f00965c..053f4ce 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -52,16 +52,16 @@ import org.apache.geode.test.dunit.rules.MemberVM;
 @RunWith(Parameterized.class)
 public class GeodeAsSourceDUnitTest {
   @Rule
-  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+  public final ClusterStartupRule clusterStartupRule = new 
ClusterStartupRule(3);
 
   @Rule
-  public TestName testName = new TestName();
+  public final TestName testName = new TestName();
 
   @ClassRule
-  public static TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
+  public static final TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
 
   @Rule
-  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+  public final TemporaryFolder temporaryFolderForOffset = new 
TemporaryFolder();
 
   @BeforeClass
   public static void setup()
@@ -148,7 +148,7 @@ public class GeodeAsSourceDUnitTest {
 
       // Insert data into the Apache Geode source from the client
       client1.invoke(() -> {
-        Region region = 
ClusterStartupRule.getClientCache().getRegion(sourceRegion);
+        Region<Object, Object> region = 
ClusterStartupRule.getClientCache().getRegion(sourceRegion);
         for (int i = 0; i < NUM_EVENT; i++) {
           region.put("KEY" + i, "VALUE" + i);
         }
diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java 
b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
index 9e46478..4461dc4 100644
--- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
+++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.kafka;
 
 
+import static org.apache.geode.kafka.GeodeConnectorConfig.parseStringByComma;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
@@ -39,8 +40,7 @@ public class GeodeConnectorConfigTest {
 
   @Test
   public void parseRegionNamesShouldSplitOnComma() {
-    GeodeConnectorConfig config = new GeodeConnectorConfig();
-    List<String> regionNames = 
config.parseStringByComma("region1,region2,region3,region4");
+    List<String> regionNames = 
parseStringByComma("region1,region2,region3,region4");
     assertEquals(4, regionNames.size());
     assertThat(true, allOf(is(regionNames.contains("region1")), 
is(regionNames.contains("region2")),
         is(regionNames.contains("region3")), 
is(regionNames.contains("region4"))));
@@ -48,8 +48,7 @@ public class GeodeConnectorConfigTest {
 
   @Test
   public void parseRegionNamesShouldChomp() {
-    GeodeConnectorConfig config = new GeodeConnectorConfig();
-    List<String> regionNames = config.parseStringByComma("region1, region2, 
region3,region4");
+    List<String> regionNames = parseStringByComma("region1, region2, 
region3,region4");
     assertEquals(4, regionNames.size());
     assertThat(true,
         allOf(is(regionNames.contains("region1")),
@@ -118,7 +117,7 @@ public class GeodeConnectorConfigTest {
   public void 
configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String 
value) {
     Map<String, List<String>> regionToTopics = 
GeodeConnectorConfig.parseRegionToTopics(value);
     assertEquals(2, regionToTopics.size());
-    assertTrue(regionToTopics.get("region1") != null);
+    assertNotNull(regionToTopics.get("region1"));
     assertEquals(1, regionToTopics.get("region1").size());
     assertTrue(regionToTopics.get("region1").contains("topic1"));
   }
@@ -127,7 +126,7 @@ public class GeodeConnectorConfigTest {
   public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() {
     Map<String, List<String>> regionToTopics =
         GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]");
-    assertTrue(regionToTopics.get("region1") != null);
+    assertNotNull(regionToTopics.get("region1"));
     assertEquals(1, regionToTopics.get("region1").size());
     assertTrue(regionToTopics.get("region1").contains("topic1"));
   }
diff --git 
a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java 
b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
index d62ccc1..e05a962 100644
--- 
a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
+++ 
b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java
@@ -48,6 +48,7 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.kafka.utilities.KafkaLocalCluster;
 import org.apache.geode.kafka.utilities.TestObject;
 import org.apache.geode.kafka.utilities.WorkerAndHerderCluster;
+import org.apache.geode.pdx.FieldType;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.pdx.PdxInstanceFactory;
 import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
@@ -170,8 +171,7 @@ public class JsonPdxConverterDUnitTest {
             .forEach(field -> {
               try {
                 Object value = field.get(originalObject);
-                Class type = field.getType();
-                instanceFactory.writeField(field.getName(), value, type);
+                instanceFactory.writeField(field.getName(), value, 
Object.class);
               } catch (IllegalAccessException ignore) {
               }
             });
diff --git a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java 
b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
index 9471f48..d37c0eb 100644
--- a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java
@@ -20,19 +20,20 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collection;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
 
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Test;
 
 import org.apache.geode.cache.Region;
 
+@SuppressWarnings("unchecked")
 public class BatchRecordsTest {
   @Test
   public void 
updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() {
-    Map updates = mock(Map.class);
-    Collection removes = mock(Collection.class);
+    HashMap<Object, Object> updates = mock(HashMap.class);
+    ArrayList<Object> removes = mock(ArrayList.class);
     when(removes.contains(any())).thenReturn(true);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
@@ -42,8 +43,8 @@ public class BatchRecordsTest {
 
   @Test
   public void updatingARecordShouldAddToTheUpdateMap() {
-    Map updates = mock(Map.class);
-    Collection removes = mock(Collection.class);
+    HashMap<Object, Object> updates = mock(HashMap.class);
+    ArrayList<Object> removes = mock(ArrayList.class);
     when(removes.contains(any())).thenReturn(false);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
@@ -53,8 +54,8 @@ public class BatchRecordsTest {
 
   @Test
   public void 
updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() {
-    Map updates = mock(Map.class);
-    Collection removes = mock(Collection.class);
+    HashMap<Object, Object> updates = mock(HashMap.class);
+    ArrayList<Object> removes = mock(ArrayList.class);
     when(removes.contains(any())).thenReturn(true);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
@@ -65,8 +66,8 @@ public class BatchRecordsTest {
 
   @Test
   public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() {
-    Map updates = mock(Map.class);
-    Collection removes = mock(Collection.class);
+    HashMap<Object, Object> updates = mock(HashMap.class);
+    ArrayList<Object> removes = mock(ArrayList.class);
     when(updates.containsKey(any())).thenReturn(true);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
@@ -76,8 +77,8 @@ public class BatchRecordsTest {
 
   @Test
   public void removingARecordAddToTheRemoveCollection() {
-    Map updates = mock(Map.class);
-    Collection removes = mock(Collection.class);
+    HashMap<Object, Object> updates = mock(HashMap.class);
+    ArrayList<Object> removes = mock(ArrayList.class);
     BatchRecords records = new BatchRecords(updates, removes);
     SinkRecord sinkRecord = mock(SinkRecord.class);
     records.addRemoveOperation(sinkRecord);
@@ -86,7 +87,7 @@ public class BatchRecordsTest {
 
   @Test
   public void executeOperationsShouldInvokePutAllAndRemoveAll() {
-    Region region = mock(Region.class);
+    Region<Object, Object> region = mock(Region.class);
     BatchRecords records = new BatchRecords();
     records.executeOperations(region);
     verify(region, times(1)).putAll(any());
diff --git 
a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java 
b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
index c71d7ba..85b9954 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java
@@ -35,11 +35,11 @@ import org.apache.geode.kafka.GeodeConnectorConfig;
 
 public class GeodeKafkaSinkTaskTest {
 
-  private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) 
{
+  private HashMap<String, String> createTestSinkProps() {
     HashMap<String, String> props = new HashMap<>();
     props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]");
     props.put(GeodeConnectorConfig.TASK_ID, "0");
-    props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove));
+    props.put(NULL_VALUES_MEAN_REMOVE, "true");
     props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]");
     return props;
   }
@@ -47,19 +47,19 @@ public class GeodeKafkaSinkTaskTest {
   @Test
   public void putRecordsAddsToRegionBatchRecords() {
     GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
-    HashMap<String, String> props = createTestSinkProps(true);
+    HashMap<String, String> props = createTestSinkProps();
 
     SinkRecord topicRecord = mock(SinkRecord.class);
     when(topicRecord.topic()).thenReturn("topic");
     when(topicRecord.value()).thenReturn("value");
     when(topicRecord.key()).thenReturn("key");
 
-    List<SinkRecord> records = new ArrayList();
+    List<SinkRecord> records = new ArrayList<>();
     records.add(topicRecord);
 
-    HashMap<String, Region> regionNameToRegion = new HashMap<>();
+    HashMap<String, Region<Object, Object>> regionNameToRegion = new 
HashMap<>();
     GeodeSinkConnectorConfig geodeSinkConnectorConfig = new 
GeodeSinkConnectorConfig(props);
-    HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+    HashMap<String, BatchRecords> batchRecordsMap = new HashMap<>();
     BatchRecords batchRecords = mock(BatchRecords.class);
     batchRecordsMap.put("region", batchRecords);
     task.configure(geodeSinkConnectorConfig);
@@ -73,19 +73,19 @@ public class GeodeKafkaSinkTaskTest {
   @Test
   public void newBatchRecordsAreCreatedIfOneDoesntExist() {
     GeodeKafkaSinkTask task = new GeodeKafkaSinkTask();
-    HashMap<String, String> props = createTestSinkProps(true);
+    HashMap<String, String> props = createTestSinkProps();
 
     SinkRecord topicRecord = mock(SinkRecord.class);
     when(topicRecord.topic()).thenReturn("topic");
     when(topicRecord.value()).thenReturn("value");
     when(topicRecord.key()).thenReturn("key");
 
-    List<SinkRecord> records = new ArrayList();
+    List<SinkRecord> records = new ArrayList<>();
     records.add(topicRecord);
 
-    HashMap<String, Region> regionNameToRegion = new HashMap<>();
+    HashMap<String, Region<Object, Object>> regionNameToRegion = new 
HashMap<>();
     GeodeSinkConnectorConfig geodeSinkConnectorConfig = new 
GeodeSinkConnectorConfig(props);
-    HashMap<String, BatchRecords> batchRecordsMap = new HashMap();
+    HashMap<String, BatchRecords> batchRecordsMap = new HashMap<>();
     task.configure(geodeSinkConnectorConfig);
     task.setRegionNameToRegion(regionNameToRegion);
 
diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java 
b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
index 0ae65da..d3974cb 100644
--- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
+++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java
@@ -37,7 +37,7 @@ public class GeodeKafkaSinkTest {
   @Test
   public void taskConfigsCreatesMaxNumberOfTasks() {
     GeodeKafkaSink sink = new GeodeKafkaSink();
-    Map<String, String> props = new HashMap();
+    Map<String, String> props = new HashMap<>();
     props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
     sink.start(props);
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
@@ -47,7 +47,7 @@ public class GeodeKafkaSinkTest {
   @Test
   public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() {
     GeodeKafkaSink sink = new GeodeKafkaSink();
-    Map<String, String> props = new HashMap();
+    Map<String, String> props = new HashMap<>();
     props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
     sink.start(props);
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
@@ -59,11 +59,11 @@ public class GeodeKafkaSinkTest {
   @Test
   public void eachTaskHasUniqueTaskIds() {
     GeodeKafkaSink sink = new GeodeKafkaSink();
-    Map<String, String> props = new HashMap();
+    Map<String, String> props = new HashMap<>();
     props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]");
     sink.start(props);
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
-    HashSet<String> seenIds = new HashSet();
+    HashSet<String> seenIds = new HashSet<>();
     for (Map<String, String> taskProp : tasks) {
       assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
     }
diff --git 
a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java 
b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
index 07c9f0b..a85786b 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java
@@ -40,20 +40,21 @@ import org.junit.Test;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.query.CqAttributes;
 import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.Struct;
 import org.apache.geode.cache.query.internal.ResultsBag;
 import org.apache.geode.kafka.GeodeContext;
 
-
+@SuppressWarnings("unchecked")
 public class GeodeKafkaSourceTaskTest {
 
 
   @Test
   public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
-    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    CqResults fakeInitialResults = new ResultsBag();
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
+    CqResults<Object> fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(Struct.class));
     }
@@ -69,14 +70,14 @@ public class GeodeKafkaSourceTaskTest {
   @Test
   public void 
whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
-    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    CqResults fakeInitialResults = new ResultsBag();
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
+    CqResults<Object> fakeInitialResults = new ResultsBag();
     for (int i = 0; i < 10; i++) {
       fakeInitialResults.add(mock(CqEvent.class));
     }
 
-    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), 
anyBoolean()))
-        .thenReturn(fakeInitialResults);
+    when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(mock(CqQuery.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     task.installListenersToRegion(geodeContext, 1, 
createEventBufferSupplier(eventBuffer),
         "testRegion", DEFAULT_CQ_PREFIX, false, false);
@@ -86,10 +87,10 @@ public class GeodeKafkaSourceTaskTest {
   @Test
   public void cqListenerOnEventPopulatesEventsBuffer() {
     GeodeContext geodeContext = mock(GeodeContext.class);
-    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
+    BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100);
 
-    when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), 
anyBoolean()))
-        .thenReturn(mock(CqResults.class));
+    when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(mock(CqQuery.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     GeodeKafkaSourceListener listener =
         task.installListenersToRegion(geodeContext, 1, 
createEventBufferSupplier(eventBuffer),
@@ -121,11 +122,12 @@ public class GeodeKafkaSourceTaskTest {
     when(geodeContext.getClientCache()).thenReturn(clientCache);
 
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
-    regionToTopicsMap.put("region1", new ArrayList());
+    regionToTopicsMap.put("region1", new ArrayList<>());
 
     GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
     when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
-
+    when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean()))
+        .thenReturn(mock(CqQuery.class));
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     task.installOnGeode(config, geodeContext, null, "someCqPrefix", false);
     verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), 
anyBoolean());
@@ -140,7 +142,7 @@ public class GeodeKafkaSourceTaskTest {
     when(geodeContext.newCqWithInitialResults(anyString(), anyString(), 
any(CqAttributes.class),
         anyBoolean())).thenReturn(new ResultsBag());
     Map<String, List<String>> regionToTopicsMap = new HashMap<>();
-    regionToTopicsMap.put("region1", new ArrayList());
+    regionToTopicsMap.put("region1", new ArrayList<>());
 
     GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class);
     when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet());
@@ -167,31 +169,6 @@ public class GeodeKafkaSourceTaskTest {
   }
 
   @Test
-  public void pollReturnsEventsWhenEventBufferHasValues() throws Exception {
-    // BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100);
-    // CqEvent cqEvent = mock(CqEvent.class);
-    // when(cqEvent.getNewValue()).thenReturn("New Value");
-    // GeodeEvent event = mock(GeodeEvent.class);
-    // when(event.getEvent()).thenReturn(cqEvent);
-    // eventBuffer.add(event);
-    //
-    // List<String> topics = new ArrayList<>();
-    // topics.add("myTopic");
-    //
-    // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-    // task.startForTesting(eventBuffer, topics, 1);
-    // List<SourceRecord> records = task.poll();
-    // assertEquals(1, records.size());
-  }
-
-  @Test
-  public void installOnGeodeShouldCallCq() {
-    GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-  }
-
-
-
-  @Test
   public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() {
     GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
     List<String> regionNames = Arrays.asList("region1", "region2", "region3");
@@ -202,35 +179,7 @@ public class GeodeKafkaSourceTaskTest {
     assertThat(true, 
is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3")));
   }
 
-  @Test
-  public void listOfLocatorsShouldBeConfiguredIntoClientCache() {
-
-  }
-
-  @Test
-  public void shouldNotBeDurableIfDurableClientIdIsNull() {
-
-  }
-
-  @Test
-  public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() {
-
-  }
-
-
-  @Test
-  public void cqPrefixShouldBeProperlyCalculatedFromProps() {
-    // GeodeContext geodeContext = mock(GeodeContext.class);
-    // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask();
-  }
-
-
   private EventBufferSupplier 
createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) {
-    return new EventBufferSupplier() {
-      @Override
-      public BlockingQueue<GeodeEvent> get() {
-        return eventBuffer;
-      }
-    };
+    return () -> eventBuffer;
   }
 }
diff --git 
a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java 
b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
index 38e9498..3019101 100644
--- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
+++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java
@@ -32,7 +32,7 @@ public class GeodeKafkaSourceTest {
   @Test
   public void taskConfigsCreatesMaxNumberOfTasks() {
     GeodeKafkaSource source = new GeodeKafkaSource();
-    Map<String, String> props = new HashMap();
+    HashMap<String, String> props = new HashMap<>();
     props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
     source.start(props);
     Collection<Map<String, String>> tasks = source.taskConfigs(5);
@@ -42,7 +42,7 @@ public class GeodeKafkaSourceTest {
   @Test
   public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() {
     GeodeKafkaSource source = new GeodeKafkaSource();
-    Map<String, String> props = new HashMap();
+    HashMap<String, String> props = new HashMap<>();
     props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
     source.start(props);
     Collection<Map<String, String>> tasks = source.taskConfigs(5);
@@ -54,11 +54,11 @@ public class GeodeKafkaSourceTest {
   @Test
   public void eachTaskHasUniqueTaskIds() {
     GeodeKafkaSource sink = new GeodeKafkaSource();
-    Map<String, String> props = new HashMap();
+    HashMap<String, String> props = new HashMap<>();
     props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]");
     sink.start(props);
     Collection<Map<String, String>> tasks = sink.taskConfigs(5);
-    HashSet<String> seenIds = new HashSet();
+    HashSet<String> seenIds = new HashSet<>();
     for (Map<String, String> taskProp : tasks) {
       assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID)));
     }
diff --git 
a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java 
b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
index 198e129..a2b38f2 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java
@@ -44,15 +44,13 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.junit.rules.TemporaryFolder;
 
 public class GeodeKafkaTestUtils {
-  public static ZooKeeperLocalCluster startZooKeeper(Properties 
zookeeperProperties)
+  public static void startZooKeeper(Properties zookeeperProperties)
       throws IOException, QuorumPeerConfig.ConfigException {
     ZooKeeperLocalCluster zooKeeperLocalCluster = new 
ZooKeeperLocalCluster(zookeeperProperties);
     zooKeeperLocalCluster.start();
-    return zooKeeperLocalCluster;
   }
 
-  public static KafkaLocalCluster startKafka(Properties kafkaProperties)
-      throws IOException, InterruptedException {
+  public static KafkaLocalCluster startKafka(Properties kafkaProperties) {
     KafkaLocalCluster kafkaLocalCluster = new 
KafkaLocalCluster(kafkaProperties);
     kafkaLocalCluster.start();
     return kafkaLocalCluster;
@@ -85,9 +83,7 @@ public class GeodeKafkaTestUtils {
         StringSerializer.class.getName());
 
     // Create the producer using props.
-    final Producer<String, String> producer =
-        new KafkaProducer<>(props);
-    return producer;
+    return new KafkaProducer<>(props);
   }
 
   public static Properties getZooKeeperProperties(TemporaryFolder 
temporaryFolder)
diff --git a/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java 
b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
index cb66f84..7f3016f 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java
@@ -20,13 +20,13 @@ import java.io.IOException;
 public class JavaProcess {
 
   public Process process;
-  Class classWithMain;
+  final Class<?> classWithMain;
 
-  public JavaProcess(Class classWithMain) {
+  public JavaProcess(Class<?> classWithMain) {
     this.classWithMain = classWithMain;
   }
 
-  public void exec(String... args) throws IOException, InterruptedException {
+  public void exec(String... args) throws IOException {
     String java =
         System.getProperty("java.home") + File.separator + "bin" + 
File.separator + "java";
     String classpath = System.getProperty("java.class.path");
@@ -44,10 +44,6 @@ public class JavaProcess {
     process = builder.inheritIO().start();
   }
 
-  public void waitFor() throws InterruptedException {
-    process.waitFor();
-  }
-
   public void destroy() {
     process.destroy();
   }
diff --git 
a/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java 
b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
index f342aa4..c379615 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.kafka.utilities;
 
-import java.io.IOException;
 import java.util.Properties;
 
 import kafka.server.KafkaConfig;
@@ -22,9 +21,9 @@ import kafka.server.KafkaServerStartable;
 
 public class KafkaLocalCluster {
 
-  KafkaServerStartable kafka;
+  final KafkaServerStartable kafka;
 
-  public KafkaLocalCluster(Properties kafkaProperties) throws IOException, 
InterruptedException {
+  public KafkaLocalCluster(Properties kafkaProperties) {
     KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
     kafka = new KafkaServerStartable(kafkaConfig);
   }
diff --git 
a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java 
b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
index 098b2e6..000842e 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java
@@ -18,20 +18,16 @@ import java.io.IOException;
 
 public class WorkerAndHerderCluster {
 
-  private JavaProcess workerAndHerder;
+  private final JavaProcess workerAndHerder;
 
   public WorkerAndHerderCluster() {
     workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class);
   }
 
-  public void start(String maxTasks) throws IOException, InterruptedException {
-    workerAndHerder.exec(maxTasks);
-  }
-
   public void start(String maxTasks, String sourceRegion, String sinkRegion, 
String sourceTopic,
-      String sinkTopic, String offsetPath, String locatorString, String 
keyConverter,
-      String keyConverterArgs, String valueConverter, String 
valueConverterArgs)
-      throws IOException, InterruptedException {
+                    String sinkTopic, String offsetPath, String locatorString, 
String keyConverter,
+                    String keyConverterArgs, String valueConverter, String 
valueConverterArgs)
+      throws IOException {
     String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, 
sourceTopic, sinkTopic,
         offsetPath, locatorString, keyConverter, keyConverterArgs, 
valueConverter,
         valueConverterArgs};
diff --git 
a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java 
b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
index 3723018..5c7a7e4 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java
@@ -16,7 +16,6 @@ package org.apache.geode.kafka.utilities;
 
 import static 
org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,7 +40,7 @@ import org.apache.geode.kafka.source.GeodeKafkaSource;
 
 public class WorkerAndHerderWrapper {
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) {
     if (args.length != 11) {
       throw new RuntimeException("Insufficient arguments to start workers and 
herders");
     }
@@ -67,7 +66,7 @@ public class WorkerAndHerderWrapper {
       valueConverterProps = parseArguments(valueConverterArgs, false);
     }
 
-    Map props = new HashMap();
+    HashMap<String, String> props = new HashMap<>();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put("offset.storage.file.filename", offsetPath);
     // fast flushing for testing.
diff --git 
a/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java 
b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
index 8e5e7a9..42a237c 100644
--- a/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
+++ b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.kafka.utilities;
 
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.util.Properties;
 
@@ -25,7 +27,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 public class ZooKeeperLocalCluster {
 
   ZooKeeperServerMain zooKeeperServer;
-  private Properties zooKeeperProperties;
+  private final Properties zooKeeperProperties;
   Thread zooKeeperThread;
 
   public ZooKeeperLocalCluster(Properties zooKeeperProperties) {
@@ -40,16 +42,13 @@ public class ZooKeeperLocalCluster {
     final ServerConfig configuration = new ServerConfig();
     configuration.readFrom(quorumConfiguration);
 
-    zooKeeperThread = new Thread() {
-      public void run() {
-        try {
-          zooKeeperServer.runFromConfig(configuration);
-        } catch (IOException | AdminServer.AdminServerException e) {
-          System.out.println("ZooKeeper Failed");
-          e.printStackTrace(System.err);
-        }
+    zooKeeperThread = new Thread(() -> {
+      try {
+        zooKeeperServer.runFromConfig(configuration);
+      } catch (IOException | AdminServer.AdminServerException e) {
+        fail("Unable to start ZooKeeper cluster");
       }
-    };
+    });
     zooKeeperThread.start();
     System.out.println("ZooKeeper thread started");
 

Reply via email to