This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5189d2d Changing the project to a maven project 5189d2d is described below commit 5189d2d91d8b22745ac7333c9cfa9185b513dab1 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Tue Feb 25 11:01:00 2020 -0800 Changing the project to a maven project --- .gitignore | 192 ++++---------- NOTICE | 6 + logos/APACHE-20th-logo.png | Bin 0 -> 466136 bytes logos/Apache_Geode_logo_symbol.png | Bin 0 -> 13871 bytes logos/PoweredBYlogo.png | Bin 0 -> 84350 bytes pom.xml | 293 +++++++++++++++++++++ src/assembly/development.xml | 23 ++ src/assembly/package.xml | 33 +++ src/assembly/standalone.xml | 33 +++ .../apache/geode/kafka/GeodeConnectorConfig.java | 17 +- .../java/org/apache/geode/kafka/GeodeContext.java | 17 +- .../org/apache/geode/kafka/LocatorHostPort.java | 4 +- .../kafka/security/SystemPropertyAuthInit.java | 2 +- .../org/apache/geode/kafka/sink/BatchRecords.java | 12 +- .../geode/kafka/sink/GeodeKafkaSinkTask.java | 43 +-- .../geode/kafka/sink/GeodeSinkConnectorConfig.java | 2 +- .../org/apache/geode/kafka/source/GeodeEvent.java | 6 +- .../geode/kafka/source/GeodeKafkaSource.java | 4 - .../kafka/source/GeodeKafkaSourceListener.java | 4 +- .../geode/kafka/source/GeodeKafkaSourceTask.java | 38 ++- .../kafka/source/GeodeSourceConnectorConfig.java | 4 +- .../kafka/source/SharedEventBufferSupplier.java | 3 +- .../apache/geode/kafka/GeodeAsSinkDUnitTest.java | 12 +- .../apache/geode/kafka/GeodeAsSourceDUnitTest.java | 10 +- .../geode/kafka/GeodeConnectorConfigTest.java | 11 +- .../kafka/converter/JsonPdxConverterDUnitTest.java | 4 +- .../apache/geode/kafka/sink/BatchRecordsTest.java | 27 +- .../geode/kafka/sink/GeodeKafkaSinkTaskTest.java | 20 +- .../geode/kafka/sink/GeodeKafkaSinkTest.java | 8 +- .../kafka/source/GeodeKafkaSourceTaskTest.java | 83 ++---- .../geode/kafka/source/GeodeKafkaSourceTest.java | 8 +- .../geode/kafka/utilities/GeodeKafkaTestUtils.java | 10 +- .../apache/geode/kafka/utilities/JavaProcess.java | 10 +- .../geode/kafka/utilities/KafkaLocalCluster.java | 5 +- .../kafka/utilities/WorkerAndHerderCluster.java | 12 +- .../kafka/utilities/WorkerAndHerderWrapper.java | 5 +- .../kafka/utilities/ZooKeeperLocalCluster.java | 19 +- 37 files changed, 613 insertions(+), 367 deletions(-) diff --git a/.gitignore b/.gitignore index 2e856f5..89a3ea2 100644 --- a/.gitignore +++ b/.gitignore @@ -30,13 +30,13 @@ # When using Gradle or Maven with auto-import, you should exclude module files, # since they will be recreated, and may cause churn. Uncomment if using # auto-import. -# .idea/artifacts -# .idea/compiler.xml -# .idea/modules.xml -# .idea/*.iml -# .idea/modules -# *.iml -# *.ipr + .idea/artifacts + .idea/compiler.xml + .idea/modules.xml + .idea/*.iml + .idea/modules + *.iml + *.ipr # CMake cmake-build-*/ @@ -71,105 +71,46 @@ fabric.properties # Android studio 3.1+ serialized cache file .idea/caches/build_file_checksums.ser -### Gradle template -.gradle -/build/ +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar -# Ignore Gradle GUI config -gradle-app.setting - -# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored) -!gradle-wrapper.jar - -# Cache of project -.gradletasknamecache - -# # Work around https://youtrack.jetbrains.com/issue/IDEA-116898 -# gradle/wrapper/gradle-wrapper.properties - -### Eclipse template -.metadata -bin/ -tmp/ -*.tmp -*.bak -*.swp -*~.nib -local.properties -.settings/ -.loadpath -.recommenders - -# External tool builders -.externalToolBuilders/ - -# Locally stored "Eclipse launch configurations" -*.launch - -# PyDev specific (Python IDE for Eclipse) -*.pydevproject - -# CDT-specific (C/C++ Development Tooling) -.cproject - -# CDT- autotools -.autotools - -# Java annotation processor (APT) -.factorypath - -# PDT-specific (PHP Development Tools) -.buildpath - -# sbteclipse plugin -.target - -# Tern plugin -.tern-project - -# TeXlipse plugin -.texlipse - -# STS (Spring Tool Suite) -.springBeans - -# Code Recommenders -.recommenders/ - -# Annotation Processing -.apt_generated/ -.apt_generated_test/ - -# Scala IDE specific (Scala & Java development for Eclipse) -.cache-main -.scala_dependencies -.worksheet - -### Windows template -# Windows thumbnail cache files -Thumbs.db -Thumbs.db:encryptable -ehthumbs.db -ehthumbs_vista.db - -# Dump file -*.stackdump +### macOS template +# General +.DS_Store +.AppleDouble +.LSOverride -# Folder config file -[Dd]esktop.ini +# Icon must end with two \r +Icon -# Recycle Bin used on file shares -$RECYCLE.BIN/ +# Thumbnails +._* -# Windows Installer files -*.cab -*.msi -*.msix -*.msm -*.msp +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent -# Windows shortcuts -*.lnk +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk ### Java template # Compiled class file @@ -196,43 +137,16 @@ $RECYCLE.BIN/ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* -### macOS template -# General -.DS_Store -.AppleDouble -.LSOverride - -# Icon must end with two \r -Icon - -# Thumbnails -._* +.idea +.idea/** +.dunit/ +.dunit/** +.dunit/* -# Files that might appear in the root of a volume -.DocumentRevisions-V100 -.fseventsd -.Spotlight-V100 -.TemporaryItems -.Trashes -.VolumeIcon.icns -.com.apple.timemachine.donotpresent - -# Directories potentially created on remote AFP share -.AppleDB -.AppleDesktop -Network Trash Folder -Temporary Items -.apdisk -### NetBeans template -**/nbproject/private/ -**/nbproject/Makefile-*.mk -**/nbproject/Package-*.bash -build/ -nbbuild/ -dist/ -nbdist/ -.nb-gradle/ - -.idea/ -**/dunit +dunit/locator/locator* +dunit/vm0/ +dunit/vm1/ +dunit/vm2/ +dunit/vm3/ +kafka-connect-geode.iml \ No newline at end of file diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..5c832f9 --- /dev/null +++ b/NOTICE @@ -0,0 +1,6 @@ +Apache Geode +Copyright 2016-2020 The Apache Software Foundation. + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + diff --git a/logos/APACHE-20th-logo.png b/logos/APACHE-20th-logo.png new file mode 100644 index 0000000..524eb4d Binary files /dev/null and b/logos/APACHE-20th-logo.png differ diff --git a/logos/Apache_Geode_logo_symbol.png b/logos/Apache_Geode_logo_symbol.png new file mode 100644 index 0000000..88f3996 Binary files /dev/null and b/logos/Apache_Geode_logo_symbol.png differ diff --git a/logos/PoweredBYlogo.png b/logos/PoweredBYlogo.png new file mode 100644 index 0000000..333a00b Binary files /dev/null and b/logos/PoweredBYlogo.png differ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..7a423ee --- /dev/null +++ b/pom.xml @@ -0,0 +1,293 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache</groupId> + <artifactId>kafka-connect-geode</artifactId> + <packaging>jar</packaging> + <name>kafka-connect-geode</name> + <organization> + <name>Apache Software Foundation</name> + <url>https://geode.apache.org/</url> + </organization> + <url>https://geode.apache.org/</url> + <description> + Apache Geode Sink and Source connector for Kafka Connect + </description> + <licenses> + <license> + <name>Apache License 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.html</url> + <distribution>repo</distribution> + </license> + </licenses> + + <scm> + <connection>scm:git:git://https://github.com/apache/geode-kafka-connector.git</connection> + <developerConnection>scm:git:g...@github.com:apache/geode-kafka-connector.git</developerConnection> + <url>https://github.com/apache/geode-kafka-connector</url> + <tag>HEAD</tag> + </scm> + + <version>1.0-SNAPSHOT</version> + + <properties> + <geode.core.version>1.9.0</geode.core.version> + <geode.cq.version>1.9.0</geode.cq.version> + <geode.dunit.version>1.9.0</geode.dunit.version> + <kafka.connect-api.version>2.3.1</kafka.connect-api.version> + <log4j.version>2.13.0</log4j.version> + <kafka_2.12.version>2.3.1</kafka_2.12.version> + <curator-framework.version>4.2.0</curator-framework.version> + <kafka-streams-test-utils.version>1.1.0</kafka-streams-test-utils.version> + <kafka.connect-runtime.version>2.3.1</kafka.connect-runtime.version> + <junit.version>4.12</junit.version> + <mockito.version>3.2.4</mockito.version> + <JUnitParams.version>1.1.1</JUnitParams.version> + <awaitility.version>3.1.6</awaitility.version> + <maven-plugin.version>3.8.1</maven-plugin.version> + <zookeeper.version>3.5.7</zookeeper.version> + <confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo> + </properties> + + <repositories> + <repository> + <id>confluent</id> + <name>Confluent</name> + <url>${confluent.maven.repo}</url> + </repository> + </repositories> + + <dependencies> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <version>${kafka.connect-api.version}</version> + <scope>provided</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.geode/geode-core --> + <dependency> + <groupId>org.apache.geode</groupId> + <artifactId>geode-core</artifactId> + <version>${geode.core.version}</version> + </dependency> + <dependency> + <groupId>org.apache.geode</groupId> + <artifactId>geode-cq</artifactId> + <version>${geode.cq.version}</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j.version}</version> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j.version}</version> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.12</artifactId> + <version>${kafka_2.12.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-test-utils --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-streams-test-utils</artifactId> + <version>${kafka-streams-test-utils.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator-framework.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-runtime --> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-runtime</artifactId> + <version>${kafka.connect-runtime.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/junit/junit --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/pl.pragmatists/JUnitParams --> + <dependency> + <groupId>pl.pragmatists</groupId> + <artifactId>JUnitParams</artifactId> + <version>${JUnitParams.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.apache.geode/geode-dunit --> + <dependency> + <groupId>org.apache.geode</groupId> + <artifactId>geode-dunit</artifactId> + <version>${geode.dunit.version}</version> + <scope>test</scope> + </dependency> + <!-- https://mvnrepository.com/artifact/org.awaitility/awaitility --> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>io.confluent</groupId> + <version>0.10.0</version> + <artifactId>kafka-connect-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>kafka-connect</goal> + </goals> + <configuration> + <title>Kafka Connect Apache Geode</title> + <documentationUrl>https://geode.apache.org/docs/</documentationUrl> + <description> + The Apache Geode connector can be used to move data from Kakfa to Geode and vice versa. The Sink take data from a Kafka topic and puts in a region in Geode, while the Source will move any data inserted into Geode region, to Kafka topics . + + Apache Geode is an in-memory data grid which stores data in a key-value format. When the Geode acts as a Sink, the key value pair is extracted from the Sink Record from the Kafka topic and that key-value pair is stored in Geode regions. When Geode acts as a Source, whenever a key-value pair is inserted into the region, an event is sent to connector containing the data. This data is then placed into the Kafka topic. + + The mapping between the region and topics must be provided by the user. They must also ensure that the Apache Geode cluster is up and running along with all the required regions along with the topics in Kafka. The Apache Geode cluster must consist of at least one locator and one server. + </description> + <logo>logos/elasticsearch.jpg</logo> + + <supportProviderName>Apache Software Foundation / VMware</supportProviderName> + <supportSummary>VMware, along with Apache Geode community members support the Apache Geode Connector </supportSummary> + <supportUrl>https://geode.apache.org/docs/</supportUrl> + <supportLogo>logos/PoweredBYlogo.png</supportLogo> + + <ownerUsername>apache</ownerUsername> + <ownerType>organization</ownerType> + <ownerName>Apache Software Foundation.</ownerName> + <ownerUrl>https://www.apache.org/</ownerUrl> + <ownerLogo>logos/APACHE-20th-logo.png</ownerLogo> + + <componentTypes> + <componentType>sink</componentType> + <componentType>source</componentType> + </componentTypes> + + <tags> + <tag>Apache</tag> + <tag>Geode</tag> + <tag>in-memory</tag> + <tag>datagrid</tag> + <tag>key-value</tag> + <tag>GemFire</tag> + </tags> + + <requirements> + <requirement>Apache Geode 1.9 or above</requirement> + </requirements> + + <deliveryGuarantee> + <deliveryGuarantee>atLeastOnce</deliveryGuarantee> + </deliveryGuarantee> + + <confluentControlCenterIntegration>true</confluentControlCenterIntegration> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-plugin.version}</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>${maven-plugin.version}</version> + <inherited>true</inherited> + <configuration> + <compilerArgs> + <arg>-Xlint:-processing</arg> + <arg>-Xlint:all</arg> + <arg>-Werror</arg> + </compilerArgs> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/assembly/development.xml</descriptor> + <descriptor>src/assembly/package.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + </build> + <profiles> + <profile> + <id>standalone</id> + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>src/assembly/standalone.xml</descriptor> + </descriptors> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> \ No newline at end of file diff --git a/src/assembly/development.xml b/src/assembly/development.xml new file mode 100644 index 0000000..ae37239 --- /dev/null +++ b/src/assembly/development.xml @@ -0,0 +1,23 @@ +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <!-- Assembles all dependencies in target/ directory so scripts can easily run in a development + environment --> + <id>development</id> + <formats> + <format>dir</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>share/java/kafka-connect-geode/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <useTransitiveFiltering>true</useTransitiveFiltering> + <excludes> + <!-- Exclude these jars during packaging.--> + <exclude>org.apache.kafka:connect-api</exclude> + <exclude>org.apache.kafka:connect-json</exclude> + </excludes> + </dependencySet> + </dependencySets> +</assembly> diff --git a/src/assembly/package.xml b/src/assembly/package.xml new file mode 100644 index 0000000..9704986 --- /dev/null +++ b/src/assembly/package.xml @@ -0,0 +1,33 @@ +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <!-- Assembles a packaged version targeting OS installation. --> + <id>package</id> + <formats> + <format>dir</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.basedir}</directory> + <outputDirectory>share/doc/kafka-connect-geode/</outputDirectory> + <includes> + <include>README*</include> + <include>LICENSE*</include> + <include>NOTICE*</include> + </includes> + </fileSet> + </fileSets> + <dependencySets> + <dependencySet> + <outputDirectory>share/java/kafka-connect-geode</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <useTransitiveFiltering>true</useTransitiveFiltering> + <excludes> + <!-- Exclude these jars during packaging.--> + <exclude>org.apache.kafka:connect-api</exclude> + <exclude>org.apache.kafka:connect-json</exclude> + </excludes> + </dependencySet> + </dependencySets> +</assembly> diff --git a/src/assembly/standalone.xml b/src/assembly/standalone.xml new file mode 100644 index 0000000..9cf29e3 --- /dev/null +++ b/src/assembly/standalone.xml @@ -0,0 +1,33 @@ +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <!-- Assembles a packaged jar that includes all dependencies. This still requires the kafka-connect + runtime, but allows running kafka-connect with a connector plugin using only a couple of jars. --> + <id>standalone</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.basedir}</directory> + <outputDirectory>/</outputDirectory> + <includes> + <include>README*</include> + <include>LICENSE*</include> + <include>NOTICE*</include> + <include>licenses.html</include> + <include>licenses/</include> + <include>notices/</include> + </includes> + </fileSet> + </fileSets> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + </dependencySet> + </dependencySets> +</assembly> diff --git a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java index 846b1e9..717fef6 100644 --- a/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/GeodeConnectorConfig.java @@ -54,7 +54,7 @@ public class GeodeConnectorConfig extends AbstractConfig { // Just for testing protected GeodeConnectorConfig() { - super(new ConfigDef(), new HashMap()); + super(new ConfigDef(), new HashMap<>()); taskId = 0; } @@ -104,14 +104,13 @@ public class GeodeConnectorConfig extends AbstractConfig { */ public static Map<String, List<String>> parseRegionToTopics(String combinedBindings) { if (combinedBindings == null || combinedBindings.equals("")) { - return new HashMap(); + return new HashMap<>(); } List<String> bindings = parseBindings(combinedBindings); - return bindings.stream().map(binding -> { - String[] regionToTopicsArray = parseBinding(binding); - return regionToTopicsArray; - }).collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], - regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1]))); + return bindings.stream().map( + GeodeConnectorConfig::parseBinding) + .collect(Collectors.toMap(regionToTopicsArray -> regionToTopicsArray[0], + regionToTopicsArray -> parseStringByComma(regionToTopicsArray[1]))); } public static List<String> parseBindings(String bindings) { @@ -133,11 +132,11 @@ public class GeodeConnectorConfig extends AbstractConfig { } public static List<String> parseStringBy(String string, String regex) { - return Arrays.stream(string.split(regex)).map((s) -> s.trim()).collect(Collectors.toList()); + return Arrays.stream(string.split(regex)).map(String::trim).collect(Collectors.toList()); } public static String reconstructString(Collection<String> strings) { - return strings.stream().collect(Collectors.joining("],[")); + return String.join("],[", strings); } List<LocatorHostPort> parseLocators(String locators) { diff --git a/src/main/java/org/apache/geode/kafka/GeodeContext.java b/src/main/java/org/apache/geode/kafka/GeodeContext.java index 99ef11c..6be418f 100644 --- a/src/main/java/org/apache/geode/kafka/GeodeContext.java +++ b/src/main/java/org/apache/geode/kafka/GeodeContext.java @@ -39,16 +39,16 @@ public class GeodeContext { public GeodeContext() {} public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, - String durableClientId, String durableClientTimeout, String securityAuthInit, - String securityUserName, String securityPassword, boolean usesSecurity) { + String durableClientId, String durableClientTimeout, String securityAuthInit, + String securityUserName, String securityPassword, boolean usesSecurity) { clientCache = createClientCache(locatorHostPortList, durableClientId, durableClientTimeout, securityAuthInit, securityUserName, securityPassword, usesSecurity); return clientCache; } public ClientCache connectClient(List<LocatorHostPort> locatorHostPortList, - String securityAuthInit, String securityUserName, String securityPassword, - boolean usesSecurity) { + String securityAuthInit, String securityUserName, String securityPassword, + boolean usesSecurity) { clientCache = createClientCache(locatorHostPortList, "", "", securityAuthInit, securityUserName, securityPassword, usesSecurity); return clientCache; @@ -59,8 +59,8 @@ public class GeodeContext { } public ClientCache createClientCache(List<LocatorHostPort> locators, String durableClientName, - String durableClientTimeOut, String securityAuthInit, String securityUserName, - String securityPassword, boolean usesSecurity) { + String durableClientTimeOut, String securityAuthInit, String securityUserName, + String securityPassword, boolean usesSecurity) { ClientCacheFactory ccf = new ClientCacheFactory(); ccf.setPdxReadSerialized(true); @@ -96,8 +96,9 @@ public class GeodeContext { } } - public CqResults newCqWithInitialResults(String name, String query, CqAttributes cqAttributes, - boolean isDurable) throws ConnectException { + public <E> CqResults<E> newCqWithInitialResults(String name, String query, + CqAttributes cqAttributes, + boolean isDurable) throws ConnectException { try { CqQuery cq = clientCache.getQueryService().newCq(name, query, cqAttributes, isDurable); return cq.executeWithInitialResults(); diff --git a/src/main/java/org/apache/geode/kafka/LocatorHostPort.java b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java index d879d8e..124f76d 100644 --- a/src/main/java/org/apache/geode/kafka/LocatorHostPort.java +++ b/src/main/java/org/apache/geode/kafka/LocatorHostPort.java @@ -16,8 +16,8 @@ package org.apache.geode.kafka; public class LocatorHostPort { - private String hostName; - private int port; + private final String hostName; + private final int port; public LocatorHostPort(String hostName, int port) { this.hostName = hostName; diff --git a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java index 4f3e414..117b28e 100644 --- a/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java +++ b/src/main/java/org/apache/geode/kafka/security/SystemPropertyAuthInit.java @@ -24,7 +24,7 @@ import org.apache.geode.security.AuthenticationFailedException; public class SystemPropertyAuthInit implements AuthInitialize { @Override public Properties getCredentials(Properties securityProps, DistributedMember server, - boolean isPeer) throws AuthenticationFailedException { + boolean isPeer) throws AuthenticationFailedException { Properties extractedProperties = new Properties(); extractedProperties.put("security-username", securityProps.get("security-username")); extractedProperties.put("security-password", securityProps.get("security-password")); diff --git a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java index 9d64a03..7974abd 100644 --- a/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java +++ b/src/main/java/org/apache/geode/kafka/sink/BatchRecords.java @@ -15,9 +15,7 @@ package org.apache.geode.kafka.sink; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; -import java.util.Map; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -31,15 +29,15 @@ import org.apache.geode.cache.Region; public class BatchRecords { private static final Logger logger = LoggerFactory.getLogger(BatchRecords.class); - private Map updateMap; - private Collection removeList; + private final HashMap<Object, Object> updateMap; + private final ArrayList<Object> removeList; public BatchRecords() { - this(new HashMap(), new ArrayList()); + this(new HashMap<>(), new ArrayList<>()); } /** Used for tests **/ - public BatchRecords(Map updateMap, Collection removeList) { + public BatchRecords(HashMap<Object, Object> updateMap, ArrayList<Object> removeList) { this.updateMap = updateMap; this.removeList = removeList; } @@ -67,7 +65,7 @@ public class BatchRecords { } - public void executeOperations(Region region) { + public void executeOperations(Region<Object, Object> region) { if (region != null) { region.putAll(updateMap); region.removeAll(removeList); diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java index daf2274..1688eb7 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTask.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionExistsException; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.kafka.GeodeContext; @@ -41,7 +43,7 @@ public class GeodeKafkaSinkTask extends SinkTask { private GeodeContext geodeContext; private Map<String, List<String>> topicToRegions; - private Map<String, Region> regionNameToRegion; + private Map<String, Region<Object, Object>> regionNameToRegion; private boolean nullValuesMeansRemove = true; /** @@ -59,10 +61,16 @@ public class GeodeKafkaSinkTask extends SinkTask { GeodeSinkConnectorConfig geodeConnectorConfig = new GeodeSinkConnectorConfig(props); configure(geodeConnectorConfig); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), - geodeConnectorConfig.getSecurityClientAuthInit(), - geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), - geodeConnectorConfig.usesSecurity()); + final ClientCache clientCache = + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), + geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); + if (clientCache == null) { + throw new ConnectException( + "Unable to create a client cache connected to the Apache Geode cluster"); + } regionNameToRegion = createProxyRegions(topicToRegions.values()); } catch (Exception e) { logger.error("Unable to start sink task", e); @@ -72,19 +80,18 @@ public class GeodeKafkaSinkTask extends SinkTask { void configure(GeodeSinkConnectorConfig geodeConnectorConfig) { logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); - int taskId = geodeConnectorConfig.getTaskId(); topicToRegions = geodeConnectorConfig.getTopicToRegions(); nullValuesMeansRemove = geodeConnectorConfig.getNullValuesMeanRemove(); } // For tests only - void setRegionNameToRegion(Map<String, Region> regionNameToRegion) { + void setRegionNameToRegion(Map<String, Region<Object, Object>> regionNameToRegion) { this.regionNameToRegion = regionNameToRegion; } @Override public void put(Collection<SinkRecord> records) { - put(records, new HashMap()); + put(records, new HashMap<>()); } void put(Collection<SinkRecord> records, Map<String, BatchRecords> batchRecordsMap) { @@ -92,15 +99,12 @@ public class GeodeKafkaSinkTask extends SinkTask { for (SinkRecord record : records) { updateBatchForRegionByTopic(record, batchRecordsMap); } - batchRecordsMap.entrySet().stream().forEach((entry) -> { - String region = entry.getKey(); - BatchRecords batchRecords = entry.getValue(); - batchRecords.executeOperations(regionNameToRegion.get(region)); - }); + batchRecordsMap.forEach( + (region, batchRecords) -> batchRecords.executeOperations(regionNameToRegion.get(region))); } private void updateBatchForRegionByTopic(SinkRecord sinkRecord, - Map<String, BatchRecords> batchRecordsMap) { + Map<String, BatchRecords> batchRecordsMap) { Collection<String> regionsToUpdate = topicToRegions.get(sinkRecord.topic()); for (String region : regionsToUpdate) { updateBatchRecordsForRecord(sinkRecord, batchRecordsMap, region); @@ -108,7 +112,7 @@ public class GeodeKafkaSinkTask extends SinkTask { } private void updateBatchRecordsForRecord(SinkRecord record, - Map<String, BatchRecords> batchRecordsMap, String region) { + Map<String, BatchRecords> batchRecordsMap, String region) { BatchRecords batchRecords = batchRecordsMap.get(region); if (batchRecords == null) { batchRecords = new BatchRecords(); @@ -126,13 +130,14 @@ public class GeodeKafkaSinkTask extends SinkTask { } } - private Map<String, Region> createProxyRegions(Collection<List<String>> regionNames) { + private Map<String, Region<Object, Object>> createProxyRegions( + Collection<List<String>> regionNames) { List<String> flat = regionNames.stream().flatMap(List::stream).collect(Collectors.toList()); - return flat.stream().map(regionName -> createProxyRegion(regionName)) - .collect(Collectors.toMap(region -> region.getName(), region -> region)); + return flat.stream().map(this::createProxyRegion) + .collect(Collectors.toMap(Region::getName, region -> region)); } - private Region createProxyRegion(String regionName) { + private Region<Object, Object> createProxyRegion(String regionName) { try { return geodeContext.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) .create(regionName); diff --git a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java index b12eb8f..cd49778 100644 --- a/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/sink/GeodeSinkConnectorConfig.java @@ -30,7 +30,7 @@ public class GeodeSinkConnectorConfig extends GeodeConnectorConfig { public static final String NULL_VALUES_MEAN_REMOVE = "null-values-mean-remove"; public static final String DEFAULT_NULL_VALUES_MEAN_REMOVE = "true"; - private Map<String, List<String>> topicToRegions; + private final Map<String, List<String>> topicToRegions; private final boolean nullValuesMeanRemove; public GeodeSinkConnectorConfig(Map<String, String> connectorProperties) { diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java index 654b05a..b801356 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeEvent.java @@ -21,9 +21,9 @@ import org.apache.geode.cache.query.CqEvent; */ public class GeodeEvent { - private String regionName; - private Object key; - private Object value; + private final String regionName; + private final Object key; + private final Object value; public GeodeEvent(String regionName, CqEvent event) { this(regionName, event.getKey(), event.getNewValue()); diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java index 89054a6..ca6dd0c 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSource.java @@ -79,8 +79,4 @@ public class GeodeKafkaSource extends SourceConnector { // TODO return AppInfoParser.getVersion(); } - - public Map<String, String> getSharedProps() { - return sharedProps; - } } diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java index 1d16404..8a54766 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java @@ -26,8 +26,8 @@ class GeodeKafkaSourceListener implements CqStatusListener { private static final Logger logger = LoggerFactory.getLogger(GeodeKafkaSourceListener.class); - public String regionName; - private EventBufferSupplier eventBufferSupplier; + public final String regionName; + private final EventBufferSupplier eventBufferSupplier; private boolean initialResultsLoaded; public GeodeKafkaSourceListener(EventBufferSupplier eventBufferSupplier, String regionName) { diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java index 8f26c1e..182efff 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java @@ -21,13 +21,16 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.geode.cache.query.CqQuery; import org.apache.geode.cache.query.CqResults; import org.apache.geode.cache.query.Struct; import org.apache.geode.kafka.GeodeContext; @@ -66,12 +69,18 @@ public class GeodeKafkaSourceTask extends SourceTask { GeodeSourceConnectorConfig geodeConnectorConfig = new GeodeSourceConnectorConfig(props); logger.debug("GeodeKafkaSourceTask id:" + geodeConnectorConfig.getTaskId() + " starting"); geodeContext = new GeodeContext(); - geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), - geodeConnectorConfig.getDurableClientId(), geodeConnectorConfig.getDurableClientTimeout(), - geodeConnectorConfig.getSecurityClientAuthInit(), - geodeConnectorConfig.getSecurityUserName(), geodeConnectorConfig.getSecurityPassword(), - geodeConnectorConfig.usesSecurity()); - + final ClientCache clientCache = + geodeContext.connectClient(geodeConnectorConfig.getLocatorHostPorts(), + geodeConnectorConfig.getDurableClientId(), + geodeConnectorConfig.getDurableClientTimeout(), + geodeConnectorConfig.getSecurityClientAuthInit(), + geodeConnectorConfig.getSecurityUserName(), + geodeConnectorConfig.getSecurityPassword(), + geodeConnectorConfig.usesSecurity()); + if (clientCache == null) { + throw new ConnectException( + "Unable to create an client cache connected to Apache Geode cluster"); + } batchSize = geodeConnectorConfig.getBatchSize(); eventBufferSupplier = new SharedEventBufferSupplier(geodeConnectorConfig.getQueueSize()); @@ -90,7 +99,7 @@ public class GeodeKafkaSourceTask extends SourceTask { } @Override - public List<SourceRecord> poll() throws InterruptedException { + public List<SourceRecord> poll() { ArrayList<SourceRecord> records = new ArrayList<>(batchSize); ArrayList<GeodeEvent> events = new ArrayList<>(batchSize); if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) { @@ -114,7 +123,7 @@ public class GeodeKafkaSourceTask extends SourceTask { } void installOnGeode(GeodeSourceConnectorConfig geodeConnectorConfig, GeodeContext geodeContext, - EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) { + EventBufferSupplier eventBuffer, String cqPrefix, boolean loadEntireRegion) { boolean isDurable = geodeConnectorConfig.isDurable(); int taskId = geodeConnectorConfig.getTaskId(); for (String region : geodeConnectorConfig.getCqsToRegister()) { @@ -127,26 +136,29 @@ public class GeodeKafkaSourceTask extends SourceTask { } GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId, - EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, - boolean isDurable) { + EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion, + boolean isDurable) { CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName); cqAttributesFactory.addCqListener(listener); CqAttributes cqAttributes = cqAttributesFactory.create(); try { if (loadEntireRegion) { - CqResults events = + CqResults<?> events = geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, isDurable); eventBuffer.get() - .addAll((Collection<GeodeEvent>) events.stream().map( + .addAll(events.stream().map( e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value"))) .collect(Collectors.toList())); } else { - geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), + final CqQuery cqQuery = geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName), "select * from /" + regionName, cqAttributes, isDurable); + if (cqQuery == null) { + throw new ConnectException("Unable to executed queries on the Apache Geode server"); + } } } finally { listener.signalInitialResultsLoaded(); diff --git a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java index a004f23..01294c9 100644 --- a/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java +++ b/src/main/java/org/apache/geode/kafka/source/GeodeSourceConnectorConfig.java @@ -60,8 +60,8 @@ public class GeodeSourceConnectorConfig extends GeodeConnectorConfig { private final int batchSize; private final int queueSize; - private Map<String, List<String>> regionToTopics; - private Collection<String> cqsToRegister; + private final Map<String, List<String>> regionToTopics; + private final Collection<String> cqsToRegister; public GeodeSourceConnectorConfig(Map<String, String> connectorProperties) { super(SOURCE_CONFIG_DEF, connectorProperties); diff --git a/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java index b3d1268..dbbf902 100644 --- a/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java +++ b/src/main/java/org/apache/geode/kafka/source/SharedEventBufferSupplier.java @@ -25,7 +25,7 @@ public class SharedEventBufferSupplier implements EventBufferSupplier { recreateEventBufferIfNeeded(size); } - BlockingQueue recreateEventBufferIfNeeded(int size) { + void recreateEventBufferIfNeeded(int size) { if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) { synchronized (GeodeKafkaSource.class) { if (eventBuffer == null || (eventBuffer.size() + eventBuffer.remainingCapacity()) != size) { @@ -37,7 +37,6 @@ public class SharedEventBufferSupplier implements EventBufferSupplier { } } } - return eventBuffer; } /** diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java index 668681d..a2040ac 100644 --- a/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSinkDUnitTest.java @@ -54,16 +54,16 @@ import org.apache.geode.test.dunit.rules.MemberVM; @RunWith(Parameterized.class) public class GeodeAsSinkDUnitTest { @Rule - public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); @Rule - public TestName testName = new TestName(); + public final TestName testName = new TestName(); @ClassRule - public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); + public static final TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); @Rule - public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); + public final TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); @BeforeClass public static void setup() @@ -149,11 +149,11 @@ public class GeodeAsSinkDUnitTest { Producer<String, String> producer = createProducer(); for (int i = 0; i < NUM_EVENT; i++) { - producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i)); + producer.send(new ProducerRecord<>(sinkTopic, "KEY" + i, "VALUE" + i)); } client1.invoke(() -> { - Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion); + Region<Object, Object> region = ClusterStartupRule.getClientCache().getRegion(sinkRegion); await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); }); diff --git a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java index f00965c..053f4ce 100644 --- a/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeAsSourceDUnitTest.java @@ -52,16 +52,16 @@ import org.apache.geode.test.dunit.rules.MemberVM; @RunWith(Parameterized.class) public class GeodeAsSourceDUnitTest { @Rule - public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); @Rule - public TestName testName = new TestName(); + public final TestName testName = new TestName(); @ClassRule - public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); + public static final TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); @Rule - public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); + public final TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); @BeforeClass public static void setup() @@ -148,7 +148,7 @@ public class GeodeAsSourceDUnitTest { // Insert data into the Apache Geode source from the client client1.invoke(() -> { - Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion); + Region<Object, Object> region = ClusterStartupRule.getClientCache().getRegion(sourceRegion); for (int i = 0; i < NUM_EVENT; i++) { region.put("KEY" + i, "VALUE" + i); } diff --git a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java index 9e46478..4461dc4 100644 --- a/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java +++ b/src/test/java/org/apache/geode/kafka/GeodeConnectorConfigTest.java @@ -15,6 +15,7 @@ package org.apache.geode.kafka; +import static org.apache.geode.kafka.GeodeConnectorConfig.parseStringByComma; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; @@ -39,8 +40,7 @@ public class GeodeConnectorConfigTest { @Test public void parseRegionNamesShouldSplitOnComma() { - GeodeConnectorConfig config = new GeodeConnectorConfig(); - List<String> regionNames = config.parseStringByComma("region1,region2,region3,region4"); + List<String> regionNames = parseStringByComma("region1,region2,region3,region4"); assertEquals(4, regionNames.size()); assertThat(true, allOf(is(regionNames.contains("region1")), is(regionNames.contains("region2")), is(regionNames.contains("region3")), is(regionNames.contains("region4")))); @@ -48,8 +48,7 @@ public class GeodeConnectorConfigTest { @Test public void parseRegionNamesShouldChomp() { - GeodeConnectorConfig config = new GeodeConnectorConfig(); - List<String> regionNames = config.parseStringByComma("region1, region2, region3,region4"); + List<String> regionNames = parseStringByComma("region1, region2, region3,region4"); assertEquals(4, regionNames.size()); assertThat(true, allOf(is(regionNames.contains("region1")), @@ -118,7 +117,7 @@ public class GeodeConnectorConfigTest { public void configurationShouldReturnRegionToTopicsMappingWhenParseRegionToTopics(String value) { Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics(value); assertEquals(2, regionToTopics.size()); - assertTrue(regionToTopics.get("region1") != null); + assertNotNull(regionToTopics.get("region1")); assertEquals(1, regionToTopics.get("region1").size()); assertTrue(regionToTopics.get("region1").contains("topic1")); } @@ -127,7 +126,7 @@ public class GeodeConnectorConfigTest { public void regionToTopicParsingShouldParseCorrectlyWithASingleBinding() { Map<String, List<String>> regionToTopics = GeodeConnectorConfig.parseRegionToTopics("[region1:topic1]"); - assertTrue(regionToTopics.get("region1") != null); + assertNotNull(regionToTopics.get("region1")); assertEquals(1, regionToTopics.get("region1").size()); assertTrue(regionToTopics.get("region1").contains("topic1")); } diff --git a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java index d62ccc1..e05a962 100644 --- a/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java +++ b/src/test/java/org/apache/geode/kafka/converter/JsonPdxConverterDUnitTest.java @@ -48,6 +48,7 @@ import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.kafka.utilities.KafkaLocalCluster; import org.apache.geode.kafka.utilities.TestObject; import org.apache.geode.kafka.utilities.WorkerAndHerderCluster; +import org.apache.geode.pdx.FieldType; import org.apache.geode.pdx.PdxInstance; import org.apache.geode.pdx.PdxInstanceFactory; import org.apache.geode.pdx.ReflectionBasedAutoSerializer; @@ -170,8 +171,7 @@ public class JsonPdxConverterDUnitTest { .forEach(field -> { try { Object value = field.get(originalObject); - Class type = field.getType(); - instanceFactory.writeField(field.getName(), value, type); + instanceFactory.writeField(field.getName(), value, Object.class); } catch (IllegalAccessException ignore) { } }); diff --git a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java index 9471f48..d37c0eb 100644 --- a/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/BatchRecordsTest.java @@ -20,19 +20,20 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.Collection; -import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; import org.apache.geode.cache.Region; +@SuppressWarnings("unchecked") public class BatchRecordsTest { @Test public void updatingARecordShouldRemoveFromTheRemoveListIfNullValuesIsRemoveBooleanIsSet() { - Map updates = mock(Map.class); - Collection removes = mock(Collection.class); + HashMap<Object, Object> updates = mock(HashMap.class); + ArrayList<Object> removes = mock(ArrayList.class); when(removes.contains(any())).thenReturn(true); BatchRecords records = new BatchRecords(updates, removes); SinkRecord sinkRecord = mock(SinkRecord.class); @@ -42,8 +43,8 @@ public class BatchRecordsTest { @Test public void updatingARecordShouldAddToTheUpdateMap() { - Map updates = mock(Map.class); - Collection removes = mock(Collection.class); + HashMap<Object, Object> updates = mock(HashMap.class); + ArrayList<Object> removes = mock(ArrayList.class); when(removes.contains(any())).thenReturn(false); BatchRecords records = new BatchRecords(updates, removes); SinkRecord sinkRecord = mock(SinkRecord.class); @@ -53,8 +54,8 @@ public class BatchRecordsTest { @Test public void updatingARecordShouldNotRemoveFromTheRemoveListIfNullValuesIsNotSet() { - Map updates = mock(Map.class); - Collection removes = mock(Collection.class); + HashMap<Object, Object> updates = mock(HashMap.class); + ArrayList<Object> removes = mock(ArrayList.class); when(removes.contains(any())).thenReturn(true); BatchRecords records = new BatchRecords(updates, removes); SinkRecord sinkRecord = mock(SinkRecord.class); @@ -65,8 +66,8 @@ public class BatchRecordsTest { @Test public void removingARecordShouldRemoveFromTheUpdateMapIfKeyIsPresent() { - Map updates = mock(Map.class); - Collection removes = mock(Collection.class); + HashMap<Object, Object> updates = mock(HashMap.class); + ArrayList<Object> removes = mock(ArrayList.class); when(updates.containsKey(any())).thenReturn(true); BatchRecords records = new BatchRecords(updates, removes); SinkRecord sinkRecord = mock(SinkRecord.class); @@ -76,8 +77,8 @@ public class BatchRecordsTest { @Test public void removingARecordAddToTheRemoveCollection() { - Map updates = mock(Map.class); - Collection removes = mock(Collection.class); + HashMap<Object, Object> updates = mock(HashMap.class); + ArrayList<Object> removes = mock(ArrayList.class); BatchRecords records = new BatchRecords(updates, removes); SinkRecord sinkRecord = mock(SinkRecord.class); records.addRemoveOperation(sinkRecord); @@ -86,7 +87,7 @@ public class BatchRecordsTest { @Test public void executeOperationsShouldInvokePutAllAndRemoveAll() { - Region region = mock(Region.class); + Region<Object, Object> region = mock(Region.class); BatchRecords records = new BatchRecords(); records.executeOperations(region); verify(region, times(1)).putAll(any()); diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java index c71d7ba..85b9954 100644 --- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTaskTest.java @@ -35,11 +35,11 @@ import org.apache.geode.kafka.GeodeConnectorConfig; public class GeodeKafkaSinkTaskTest { - private HashMap<String, String> createTestSinkProps(boolean nullMeansRemove) { + private HashMap<String, String> createTestSinkProps() { HashMap<String, String> props = new HashMap<>(); props.put(TOPIC_TO_REGION_BINDINGS, "[topic:region]"); props.put(GeodeConnectorConfig.TASK_ID, "0"); - props.put(NULL_VALUES_MEAN_REMOVE, String.valueOf(nullMeansRemove)); + props.put(NULL_VALUES_MEAN_REMOVE, "true"); props.put(GeodeConnectorConfig.LOCATORS, "localhost[10334]"); return props; } @@ -47,19 +47,19 @@ public class GeodeKafkaSinkTaskTest { @Test public void putRecordsAddsToRegionBatchRecords() { GeodeKafkaSinkTask task = new GeodeKafkaSinkTask(); - HashMap<String, String> props = createTestSinkProps(true); + HashMap<String, String> props = createTestSinkProps(); SinkRecord topicRecord = mock(SinkRecord.class); when(topicRecord.topic()).thenReturn("topic"); when(topicRecord.value()).thenReturn("value"); when(topicRecord.key()).thenReturn("key"); - List<SinkRecord> records = new ArrayList(); + List<SinkRecord> records = new ArrayList<>(); records.add(topicRecord); - HashMap<String, Region> regionNameToRegion = new HashMap<>(); + HashMap<String, Region<Object, Object>> regionNameToRegion = new HashMap<>(); GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props); - HashMap<String, BatchRecords> batchRecordsMap = new HashMap(); + HashMap<String, BatchRecords> batchRecordsMap = new HashMap<>(); BatchRecords batchRecords = mock(BatchRecords.class); batchRecordsMap.put("region", batchRecords); task.configure(geodeSinkConnectorConfig); @@ -73,19 +73,19 @@ public class GeodeKafkaSinkTaskTest { @Test public void newBatchRecordsAreCreatedIfOneDoesntExist() { GeodeKafkaSinkTask task = new GeodeKafkaSinkTask(); - HashMap<String, String> props = createTestSinkProps(true); + HashMap<String, String> props = createTestSinkProps(); SinkRecord topicRecord = mock(SinkRecord.class); when(topicRecord.topic()).thenReturn("topic"); when(topicRecord.value()).thenReturn("value"); when(topicRecord.key()).thenReturn("key"); - List<SinkRecord> records = new ArrayList(); + List<SinkRecord> records = new ArrayList<>(); records.add(topicRecord); - HashMap<String, Region> regionNameToRegion = new HashMap<>(); + HashMap<String, Region<Object, Object>> regionNameToRegion = new HashMap<>(); GeodeSinkConnectorConfig geodeSinkConnectorConfig = new GeodeSinkConnectorConfig(props); - HashMap<String, BatchRecords> batchRecordsMap = new HashMap(); + HashMap<String, BatchRecords> batchRecordsMap = new HashMap<>(); task.configure(geodeSinkConnectorConfig); task.setRegionNameToRegion(regionNameToRegion); diff --git a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java index 0ae65da..d3974cb 100644 --- a/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java +++ b/src/test/java/org/apache/geode/kafka/sink/GeodeKafkaSinkTest.java @@ -37,7 +37,7 @@ public class GeodeKafkaSinkTest { @Test public void taskConfigsCreatesMaxNumberOfTasks() { GeodeKafkaSink sink = new GeodeKafkaSink(); - Map<String, String> props = new HashMap(); + Map<String, String> props = new HashMap<>(); props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]"); sink.start(props); Collection<Map<String, String>> tasks = sink.taskConfigs(5); @@ -47,7 +47,7 @@ public class GeodeKafkaSinkTest { @Test public void sinkTaskConfigsAllAssignedEntireTopicToRegionBinding() { GeodeKafkaSink sink = new GeodeKafkaSink(); - Map<String, String> props = new HashMap(); + Map<String, String> props = new HashMap<>(); props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]"); sink.start(props); Collection<Map<String, String>> tasks = sink.taskConfigs(5); @@ -59,11 +59,11 @@ public class GeodeKafkaSinkTest { @Test public void eachTaskHasUniqueTaskIds() { GeodeKafkaSink sink = new GeodeKafkaSink(); - Map<String, String> props = new HashMap(); + Map<String, String> props = new HashMap<>(); props.put(TOPIC_TO_REGION_BINDINGS, "[someTopic:someRegion]"); sink.start(props); Collection<Map<String, String>> tasks = sink.taskConfigs(5); - HashSet<String> seenIds = new HashSet(); + HashSet<String> seenIds = new HashSet<>(); for (Map<String, String> taskProp : tasks) { assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID))); } diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java index 07c9f0b..a85786b 100644 --- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTaskTest.java @@ -40,20 +40,21 @@ import org.junit.Test; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.query.CqAttributes; import org.apache.geode.cache.query.CqEvent; +import org.apache.geode.cache.query.CqQuery; import org.apache.geode.cache.query.CqResults; import org.apache.geode.cache.query.Struct; import org.apache.geode.cache.query.internal.ResultsBag; import org.apache.geode.kafka.GeodeContext; - +@SuppressWarnings("unchecked") public class GeodeKafkaSourceTaskTest { @Test public void whenLoadingEntireRegionAbleToPutInitialResultsIntoEventBuffer() { GeodeContext geodeContext = mock(GeodeContext.class); - BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); - CqResults fakeInitialResults = new ResultsBag(); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100); + CqResults<Object> fakeInitialResults = new ResultsBag(); for (int i = 0; i < 10; i++) { fakeInitialResults.add(mock(Struct.class)); } @@ -69,14 +70,14 @@ public class GeodeKafkaSourceTaskTest { @Test public void whenNotLoadingEntireRegionShouldNotPutInitialResultsIntoEventBuffer() { GeodeContext geodeContext = mock(GeodeContext.class); - BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); - CqResults fakeInitialResults = new ResultsBag(); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100); + CqResults<Object> fakeInitialResults = new ResultsBag(); for (int i = 0; i < 10; i++) { fakeInitialResults.add(mock(CqEvent.class)); } - when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())) - .thenReturn(fakeInitialResults); + when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean())) + .thenReturn(mock(CqQuery.class)); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), "testRegion", DEFAULT_CQ_PREFIX, false, false); @@ -86,10 +87,10 @@ public class GeodeKafkaSourceTaskTest { @Test public void cqListenerOnEventPopulatesEventsBuffer() { GeodeContext geodeContext = mock(GeodeContext.class); - BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); + BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue<>(100); - when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(), anyBoolean())) - .thenReturn(mock(CqResults.class)); + when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean())) + .thenReturn(mock(CqQuery.class)); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); GeodeKafkaSourceListener listener = task.installListenersToRegion(geodeContext, 1, createEventBufferSupplier(eventBuffer), @@ -121,11 +122,12 @@ public class GeodeKafkaSourceTaskTest { when(geodeContext.getClientCache()).thenReturn(clientCache); Map<String, List<String>> regionToTopicsMap = new HashMap<>(); - regionToTopicsMap.put("region1", new ArrayList()); + regionToTopicsMap.put("region1", new ArrayList<>()); GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet()); - + when(geodeContext.newCq(anyString(), anyString(), any(), anyBoolean())) + .thenReturn(mock(CqQuery.class)); GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); task.installOnGeode(config, geodeContext, null, "someCqPrefix", false); verify(geodeContext, times(1)).newCq(anyString(), anyString(), any(), anyBoolean()); @@ -140,7 +142,7 @@ public class GeodeKafkaSourceTaskTest { when(geodeContext.newCqWithInitialResults(anyString(), anyString(), any(CqAttributes.class), anyBoolean())).thenReturn(new ResultsBag()); Map<String, List<String>> regionToTopicsMap = new HashMap<>(); - regionToTopicsMap.put("region1", new ArrayList()); + regionToTopicsMap.put("region1", new ArrayList<>()); GeodeSourceConnectorConfig config = mock(GeodeSourceConnectorConfig.class); when(config.getCqsToRegister()).thenReturn(regionToTopicsMap.keySet()); @@ -167,31 +169,6 @@ public class GeodeKafkaSourceTaskTest { } @Test - public void pollReturnsEventsWhenEventBufferHasValues() throws Exception { - // BlockingQueue<GeodeEvent> eventBuffer = new LinkedBlockingQueue(100); - // CqEvent cqEvent = mock(CqEvent.class); - // when(cqEvent.getNewValue()).thenReturn("New Value"); - // GeodeEvent event = mock(GeodeEvent.class); - // when(event.getEvent()).thenReturn(cqEvent); - // eventBuffer.add(event); - // - // List<String> topics = new ArrayList<>(); - // topics.add("myTopic"); - // - // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - // task.startForTesting(eventBuffer, topics, 1); - // List<SourceRecord> records = task.poll(); - // assertEquals(1, records.size()); - } - - @Test - public void installOnGeodeShouldCallCq() { - GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - } - - - - @Test public void createSourcePartitionsShouldReturnAMapOfSourcePartitions() { GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); List<String> regionNames = Arrays.asList("region1", "region2", "region3"); @@ -202,35 +179,7 @@ public class GeodeKafkaSourceTaskTest { assertThat(true, is(sourcePartitions.get("region3").get(REGION_PARTITION).equals("region3"))); } - @Test - public void listOfLocatorsShouldBeConfiguredIntoClientCache() { - - } - - @Test - public void shouldNotBeDurableIfDurableClientIdIsNull() { - - } - - @Test - public void shouldNotCallReadyForEventsIfDurableClientPrefixIsEmpty() { - - } - - - @Test - public void cqPrefixShouldBeProperlyCalculatedFromProps() { - // GeodeContext geodeContext = mock(GeodeContext.class); - // GeodeKafkaSourceTask task = new GeodeKafkaSourceTask(); - } - - private EventBufferSupplier createEventBufferSupplier(BlockingQueue<GeodeEvent> eventBuffer) { - return new EventBufferSupplier() { - @Override - public BlockingQueue<GeodeEvent> get() { - return eventBuffer; - } - }; + return () -> eventBuffer; } } diff --git a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java index 38e9498..3019101 100644 --- a/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java +++ b/src/test/java/org/apache/geode/kafka/source/GeodeKafkaSourceTest.java @@ -32,7 +32,7 @@ public class GeodeKafkaSourceTest { @Test public void taskConfigsCreatesMaxNumberOfTasks() { GeodeKafkaSource source = new GeodeKafkaSource(); - Map<String, String> props = new HashMap(); + HashMap<String, String> props = new HashMap<>(); props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]"); source.start(props); Collection<Map<String, String>> tasks = source.taskConfigs(5); @@ -42,7 +42,7 @@ public class GeodeKafkaSourceTest { @Test public void sourceTaskConfigsAllAssignedEntireRegionToTopicBinding() { GeodeKafkaSource source = new GeodeKafkaSource(); - Map<String, String> props = new HashMap(); + HashMap<String, String> props = new HashMap<>(); props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]"); source.start(props); Collection<Map<String, String>> tasks = source.taskConfigs(5); @@ -54,11 +54,11 @@ public class GeodeKafkaSourceTest { @Test public void eachTaskHasUniqueTaskIds() { GeodeKafkaSource sink = new GeodeKafkaSource(); - Map<String, String> props = new HashMap(); + HashMap<String, String> props = new HashMap<>(); props.put(REGION_TO_TOPIC_BINDINGS, "[someRegion:someTopic]"); sink.start(props); Collection<Map<String, String>> tasks = sink.taskConfigs(5); - HashSet<String> seenIds = new HashSet(); + HashSet<String> seenIds = new HashSet<>(); for (Map<String, String> taskProp : tasks) { assertTrue(seenIds.add(taskProp.get(GeodeConnectorConfig.TASK_ID))); } diff --git a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java index 198e129..a2b38f2 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java +++ b/src/test/java/org/apache/geode/kafka/utilities/GeodeKafkaTestUtils.java @@ -44,15 +44,13 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.junit.rules.TemporaryFolder; public class GeodeKafkaTestUtils { - public static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties) + public static void startZooKeeper(Properties zookeeperProperties) throws IOException, QuorumPeerConfig.ConfigException { ZooKeeperLocalCluster zooKeeperLocalCluster = new ZooKeeperLocalCluster(zookeeperProperties); zooKeeperLocalCluster.start(); - return zooKeeperLocalCluster; } - public static KafkaLocalCluster startKafka(Properties kafkaProperties) - throws IOException, InterruptedException { + public static KafkaLocalCluster startKafka(Properties kafkaProperties) { KafkaLocalCluster kafkaLocalCluster = new KafkaLocalCluster(kafkaProperties); kafkaLocalCluster.start(); return kafkaLocalCluster; @@ -85,9 +83,7 @@ public class GeodeKafkaTestUtils { StringSerializer.class.getName()); // Create the producer using props. - final Producer<String, String> producer = - new KafkaProducer<>(props); - return producer; + return new KafkaProducer<>(props); } public static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder) diff --git a/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java index cb66f84..7f3016f 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java +++ b/src/test/java/org/apache/geode/kafka/utilities/JavaProcess.java @@ -20,13 +20,13 @@ import java.io.IOException; public class JavaProcess { public Process process; - Class classWithMain; + final Class<?> classWithMain; - public JavaProcess(Class classWithMain) { + public JavaProcess(Class<?> classWithMain) { this.classWithMain = classWithMain; } - public void exec(String... args) throws IOException, InterruptedException { + public void exec(String... args) throws IOException { String java = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); @@ -44,10 +44,6 @@ public class JavaProcess { process = builder.inheritIO().start(); } - public void waitFor() throws InterruptedException { - process.waitFor(); - } - public void destroy() { process.destroy(); } diff --git a/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java index f342aa4..c379615 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/KafkaLocalCluster.java @@ -14,7 +14,6 @@ */ package org.apache.geode.kafka.utilities; -import java.io.IOException; import java.util.Properties; import kafka.server.KafkaConfig; @@ -22,9 +21,9 @@ import kafka.server.KafkaServerStartable; public class KafkaLocalCluster { - KafkaServerStartable kafka; + final KafkaServerStartable kafka; - public KafkaLocalCluster(Properties kafkaProperties) throws IOException, InterruptedException { + public KafkaLocalCluster(Properties kafkaProperties) { KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); kafka = new KafkaServerStartable(kafkaConfig); } diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java index 098b2e6..000842e 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderCluster.java @@ -18,20 +18,16 @@ import java.io.IOException; public class WorkerAndHerderCluster { - private JavaProcess workerAndHerder; + private final JavaProcess workerAndHerder; public WorkerAndHerderCluster() { workerAndHerder = new JavaProcess(WorkerAndHerderWrapper.class); } - public void start(String maxTasks) throws IOException, InterruptedException { - workerAndHerder.exec(maxTasks); - } - public void start(String maxTasks, String sourceRegion, String sinkRegion, String sourceTopic, - String sinkTopic, String offsetPath, String locatorString, String keyConverter, - String keyConverterArgs, String valueConverter, String valueConverterArgs) - throws IOException, InterruptedException { + String sinkTopic, String offsetPath, String locatorString, String keyConverter, + String keyConverterArgs, String valueConverter, String valueConverterArgs) + throws IOException { String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic, offsetPath, locatorString, keyConverter, keyConverterArgs, valueConverter, valueConverterArgs}; diff --git a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java index 3723018..5c7a7e4 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java +++ b/src/test/java/org/apache/geode/kafka/utilities/WorkerAndHerderWrapper.java @@ -16,7 +16,6 @@ package org.apache.geode.kafka.utilities; import static org.apache.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -41,7 +40,7 @@ import org.apache.geode.kafka.source.GeodeKafkaSource; public class WorkerAndHerderWrapper { - public static void main(String[] args) throws IOException { + public static void main(String[] args) { if (args.length != 11) { throw new RuntimeException("Insufficient arguments to start workers and herders"); } @@ -67,7 +66,7 @@ public class WorkerAndHerderWrapper { valueConverterProps = parseArguments(valueConverterArgs, false); } - Map props = new HashMap(); + HashMap<String, String> props = new HashMap<>(); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("offset.storage.file.filename", offsetPath); // fast flushing for testing. diff --git a/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java index 8e5e7a9..42a237c 100644 --- a/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java +++ b/src/test/java/org/apache/geode/kafka/utilities/ZooKeeperLocalCluster.java @@ -14,6 +14,8 @@ */ package org.apache.geode.kafka.utilities; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.Properties; @@ -25,7 +27,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig; public class ZooKeeperLocalCluster { ZooKeeperServerMain zooKeeperServer; - private Properties zooKeeperProperties; + private final Properties zooKeeperProperties; Thread zooKeeperThread; public ZooKeeperLocalCluster(Properties zooKeeperProperties) { @@ -40,16 +42,13 @@ public class ZooKeeperLocalCluster { final ServerConfig configuration = new ServerConfig(); configuration.readFrom(quorumConfiguration); - zooKeeperThread = new Thread() { - public void run() { - try { - zooKeeperServer.runFromConfig(configuration); - } catch (IOException | AdminServer.AdminServerException e) { - System.out.println("ZooKeeper Failed"); - e.printStackTrace(System.err); - } + zooKeeperThread = new Thread(() -> { + try { + zooKeeperServer.runFromConfig(configuration); + } catch (IOException | AdminServer.AdminServerException e) { + fail("Unable to start ZooKeeper cluster"); } - }; + }); zooKeeperThread.start(); System.out.println("ZooKeeper thread started");