MINOR: Code Cleanup

Clean up includes:

- Switching try-catch-finally blocks to try-with-resources when possible
- Removing some seemingly unnecessary `SuppressWarnings` annotations
- Resolving some Java warnings
- Closing unclosed Closable objects
- Removing unused code

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Balint Molnar <balintmolna...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3222 from vahidhashemian/minor/code_cleanup_1706


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f87d58b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f87d58b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f87d58b7

Branch: refs/heads/trunk
Commit: f87d58b796977fdaefb089d17cb30b2071cd4485
Parents: 3bfc073
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Wed Jul 19 10:51:28 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jul 19 10:51:28 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   3 +-
 .../producer/internals/TransactionManager.java  |   1 -
 .../requests/OffsetsForLeaderEpochRequest.java  |   4 +-
 .../clients/consumer/KafkaConsumerTest.java     |  17 +-
 .../clients/producer/KafkaProducerTest.java     |  33 ++--
 .../clients/producer/MockProducerTest.java      |  71 ++++++-
 .../clients/producer/internals/SenderTest.java  |   9 +-
 .../utils/ByteBufferOutputStreamTest.java       |  10 +-
 .../connect/file/FileStreamSourceTaskTest.java  |   1 +
 .../connect/runtime/isolation/Plugins.java      |   1 -
 .../connect/storage/FileOffsetBackingStore.java |   4 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   1 -
 .../storage/KafkaConfigBackingStoreTest.java    |  17 +-
 .../kafka/connect/util/TopicAdminTest.java      |   1 -
 .../kafka/connect/transforms/CastTest.java      | 144 ++++++--------
 .../connect/transforms/ExtractFieldTest.java    |   9 +-
 .../kafka/connect/transforms/FlattenTest.java   |  54 +++---
 .../connect/transforms/HoistFieldTest.java      |   9 +-
 .../connect/transforms/InsertFieldTest.java     |  14 +-
 .../connect/transforms/RegexRouterTest.java     |   5 +-
 .../connect/transforms/ReplaceFieldTest.java    |  11 +-
 .../transforms/SetSchemaMetadataTest.java       |  11 +-
 .../transforms/TimestampConverterTest.java      | 128 ++++++-------
 .../connect/transforms/TimestampRouterTest.java |   8 +-
 .../connect/transforms/ValueToKeyTest.java      |   9 +-
 .../apache/kafka/streams/kstream/KStream.java   |   1 -
 .../kafka/streams/kstream/KStreamBuilder.java   |   6 +-
 .../kstream/internals/KGroupedStreamImpl.java   |   7 -
 .../kstream/internals/KTableKTableJoin.java     |   1 -
 .../kstream/internals/KTableKTableLeftJoin.java |   1 -
 .../internals/KTableKTableOuterJoin.java        |   1 -
 .../internals/KTableKTableRightJoin.java        |   1 -
 .../internals/GlobalStateUpdateTask.java        |   1 -
 .../processor/internals/StreamThread.java       |   1 -
 .../internals/assignment/AssignmentInfo.java    |   4 +-
 .../apache/kafka/streams/state/StateSerdes.java |   1 -
 .../state/internals/CachingKeyValueStore.java   |   1 -
 .../state/internals/CachingWindowStore.java     |   1 -
 .../ChangeLoggingSegmentedBytesStore.java       |   1 -
 .../streams/state/internals/RocksDBStore.java   |   1 -
 .../streams/kstream/KStreamBuilderTest.java     |  10 +-
 ...reamSessionWindowAggregateProcessorTest.java |   2 -
 .../WindowedStreamPartitionerTest.java          |   6 +
 .../streams/processor/TopologyBuilderTest.java  |  14 +-
 .../processor/internals/QuickUnionTest.java     |   1 -
 .../processor/internals/StandbyTaskTest.java    |   2 -
 .../internals/StreamPartitionAssignorTest.java  |   1 -
 .../processor/internals/StreamTaskTest.java     |   1 -
 .../streams/state/KeyValueStoreTestDriver.java  |   1 -
 .../ChangeLoggingSegmentedBytesStoreTest.java   |   3 -
 .../CompositeReadOnlyKeyValueStoreTest.java     |   4 -
 .../DelegatingPeekingKeyValueIteratorTest.java  |   5 +
 ...gedSortedCacheKeyValueStoreIteratorTest.java |   2 +
 ...rtedCacheWrappedWindowStoreIteratorTest.java |   3 +
 .../MeteredSegmentedBytesStoreTest.java         |   1 -
 .../RocksDBKeyValueStoreSupplierTest.java       |   2 -
 .../RocksDBSessionStoreSupplierTest.java        |   3 -
 .../RocksDBWindowStoreSupplierTest.java         |   3 -
 .../state/internals/StoreChangeLoggerTest.java  |   2 -
 .../streams/tests/BrokerCompatibilityTest.java  |   1 +
 .../kafka/streams/tests/SmokeTestDriver.java    |   1 -
 .../kafka/tools/ClientCompatibilityTest.java    | 187 ++++++++++---------
 62 files changed, 419 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c54b739..5f854d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -302,7 +302,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 keySerializer, valueSerializer);
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) {
         try {
             log.trace("Starting the Kafka producer");
@@ -339,7 +339,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = ensureExtended(valueSerializer);
             }
-            
 
             // load interceptors and make sure they get clientId
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 8cee794..c5542e4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -628,7 +628,6 @@ public class TransactionManager {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public void onComplete(ClientResponse response) {
             if (response.requestHeader().correlationId() != 
inFlightRequestCorrelationId) {
                 fatalError(new RuntimeException("Detected more than one 
in-flight transactional request."));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index f898a75..fc31d75 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -42,7 +42,7 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     }
 
     public static class Builder extends 
AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
-        private Map<TopicPartition, Integer> epochsByPartition = new HashMap();
+        private Map<TopicPartition, Integer> epochsByPartition = new 
HashMap<>();
 
         public Builder() {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
@@ -129,7 +129,7 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
-        Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
+        Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
         for (TopicPartition tp : epochsByPartition.keySet()) {
             errorResponse.put(tp, new EpochEndOffset(error, 
EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 219c3f6..9fd7e19 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -129,13 +129,12 @@ public class KafkaConsumerTest {
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
             new KafkaConsumer<>(props, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+            Assert.fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, 
MockMetricsReporter.INIT_COUNT.get());
             assertEquals(oldCloseCount + 1, 
MockMetricsReporter.CLOSE_COUNT.get());
             assertEquals("Failed to construct kafka consumer", e.getMessage());
-            return;
         }
-        Assert.fail("should have caught an exception and returned");
     }
 
     @Test
@@ -1191,23 +1190,17 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptySubscription() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        consumer.subscribe(Collections.<String>emptyList());
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.subscribe(Collections.<String>emptyList());
             consumer.poll(0);
-        } finally {
-            consumer.close();
         }
     }
 
     @Test(expected = IllegalStateException.class)
     public void testPollWithEmptyUserAssignment() {
-        KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        consumer.assign(Collections.<TopicPartition>emptySet());
-        try {
+        try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+            consumer.assign(Collections.<TopicPartition>emptySet());
             consumer.poll(0);
-        } finally {
-            consumer.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 56c1b18..dd62457 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -87,16 +87,13 @@ public class KafkaProducerTest {
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
-        try {
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], 
byte[]>(
-                    props, new ByteArraySerializer(), new 
ByteArraySerializer());
+        try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+            fail("should have caught an exception and returned");
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, 
MockMetricsReporter.INIT_COUNT.get());
             assertEquals(oldCloseCount + 1, 
MockMetricsReporter.CLOSE_COUNT.get());
             assertEquals("Failed to construct kafka producer", e.getMessage());
-            return;
         }
-        fail("should have caught an exception and returned");
     }
 
     @Test
@@ -172,9 +169,7 @@ public class KafkaProducerTest {
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         config.put(ProducerConfig.SEND_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
         config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 
Selectable.USE_DEFAULT_BUFFER_SIZE);
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
-                config, new ByteArraySerializer(), new ByteArraySerializer());
-        producer.close();
+        new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer()).close();
     }
 
     @Test(expected = KafkaException.class)
@@ -355,7 +350,7 @@ public class KafkaProducerTest {
         }
         Assert.assertTrue("Topic should still exist in metadata", 
metadata.containsTopic(topic));
     }
-    
+
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testHeaders() throws Exception {
@@ -369,8 +364,6 @@ public class KafkaProducerTest {
         MemberModifier.field(KafkaProducer.class, "metadata").set(producer, 
metadata);
 
         String topic = "topic";
-        Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
-
         final Cluster cluster = new Cluster(
                 "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
@@ -395,9 +388,9 @@ public class KafkaProducerTest {
 
         //ensure headers can be mutated pre send.
         record.headers().add(new RecordHeader("test", "header2".getBytes()));
-        
+
         producer.send(record, null);
-        
+
         //ensure headers are closed and cannot be mutated post send
         try {
             record.headers().add(new RecordHeader("test", "test".getBytes()));
@@ -405,7 +398,7 @@ public class KafkaProducerTest {
         } catch (IllegalStateException ise) {
             //expected
         }
-        
+
         //ensure existing headers are not changed, and last header for key is 
still original value
         assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), 
"header2".getBytes()));
 
@@ -436,7 +429,7 @@ public class KafkaProducerTest {
             assertEquals(Sensor.RecordingLevel.DEBUG, 
producer.metrics.config().recordLevel());
         }
     }
-    
+
     @PrepareOnlyThisForTest(Metadata.class)
     @Test
     public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception 
{
@@ -445,7 +438,7 @@ public class KafkaProducerTest {
         props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1");
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"value");
-        
+
         KafkaProducer<String, String> producer = new KafkaProducer<>(props, 
new StringSerializer(),
                 new StringSerializer());
         Metadata metadata = PowerMock.createNiceMock(Metadata.class);
@@ -457,20 +450,18 @@ public class KafkaProducerTest {
             Collections.<String>emptySet(),
             Collections.<String>emptySet());
         EasyMock.expect(metadata.fetch()).andReturn(cluster).once();
-        
+
         // Mock interceptors field
         ProducerInterceptors interceptors = 
PowerMock.createMock(ProducerInterceptors.class);
         EasyMock.expect(interceptors.onSend(record)).andReturn(record);
         interceptors.onSendError(EasyMock.eq(record), 
EasyMock.<TopicPartition>notNull(), EasyMock.<Exception>notNull());
         EasyMock.expectLastCall();
         MemberModifier.field(KafkaProducer.class, 
"interceptors").set(producer, interceptors);
-        
+
         PowerMock.replay(metadata);
         EasyMock.replay(interceptors);
         producer.send(record);
-        
+
         EasyMock.verify(interceptors);
-        
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index eeb9b5f..d343194 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockSerializer;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -47,14 +48,23 @@ import static org.junit.Assert.fail;
 public class MockProducerTest {
 
     private final String topic = "topic";
-    private final MockProducer<byte[], byte[]> producer = new 
MockProducer<>(true, new MockSerializer(), new MockSerializer());
+    private MockProducer<byte[], byte[]> producer;
     private final ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes());
     private final ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes());
 
+    private void buildMockProducer(boolean autoComplete) {
+        this.producer = new MockProducer<>(autoComplete, new MockSerializer(), 
new MockSerializer());
+    }
+
+    @After
+    public void cleanup() {
+        if (this.producer != null && !this.producer.closed())
+            this.producer.close();
+    }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
+        buildMockProducer(true);
         Future<RecordMetadata> metadata = producer.send(record1);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -77,11 +87,12 @@ public class MockProducerTest {
         assertEquals("Partition should be correct", 1, 
metadata.get().partition());
         producer.clear();
         assertEquals("Clear should erase our history", 0, 
producer.history().size());
+        producer.close();
     }
 
     @Test
     public void testManualCompletion() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);
@@ -98,7 +109,7 @@ public class MockProducerTest {
             assertEquals(e, err.getCause());
         }
         assertFalse("No more requests to complete", producer.completeNext());
-        
+
         Future<RecordMetadata> md3 = producer.send(record1);
         Future<RecordMetadata> md4 = producer.send(record2);
         assertTrue("Requests should not be completed.", !md3.isDone() && 
!md4.isDone());
@@ -108,12 +119,14 @@ public class MockProducerTest {
 
     @Test
     public void shouldInitTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         assertTrue(producer.transactionInitialized());
     }
 
     @Test
     public void 
shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.initTransactions();
@@ -123,11 +136,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.beginTransaction();
     }
 
     @Test
     public void shouldBeginTransactions() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         assertTrue(producer.transactionInFlight());
@@ -135,11 +150,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void 
shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.sendOffsetsToTransaction(null, null);
     }
 
     @Test
     public void 
shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.sendOffsetsToTransaction(null, null);
@@ -149,11 +166,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnCommitIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.commitTransaction();
     }
 
     @Test
     public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.commitTransaction();
@@ -163,6 +182,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldCommitEmptyTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.commitTransaction();
@@ -173,6 +193,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldCountCommittedTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -183,6 +204,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldNotCountAbortedTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -195,11 +217,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowOnAbortIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.abortTransaction();
     }
 
     @Test
     public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() {
+        buildMockProducer(true);
         producer.initTransactions();
         try {
             producer.abortTransaction();
@@ -209,6 +233,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAbortEmptyTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.abortTransaction();
@@ -219,6 +244,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAbortInFlightTransactionOnClose() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.close();
@@ -229,11 +255,13 @@ public class MockProducerTest {
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowFenceProducerIfTransactionsNotInitialized() {
+        buildMockProducer(true);
         producer.fenceProducer();
     }
 
     @Test
     public void shouldThrowOnBeginTransactionsIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -244,6 +272,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -254,6 +283,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFlushIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -264,6 +294,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -274,6 +305,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCommitTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -284,6 +316,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortTransactionIfProducerGotFenced() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.fenceProducer();
         try {
@@ -294,6 +327,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() 
{
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -313,7 +347,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -331,6 +365,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -346,7 +381,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() 
throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -359,6 +394,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
 
         producer.beginTransaction();
@@ -378,6 +414,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -410,6 +447,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() 
{
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -421,6 +459,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
         producer.sendOffsetsToTransaction(Collections.<TopicPartition, 
OffsetAndMetadata>emptyMap(), "groupId");
@@ -429,6 +468,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldAddOffsetsWhenSendOffsetsToTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -445,6 +485,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -465,6 +506,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled()
 {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -501,6 +543,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -521,6 +564,7 @@ public class MockProducerTest {
 
     @Test
     public void 
shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() {
+        buildMockProducer(true);
         producer.initTransactions();
         producer.beginTransaction();
 
@@ -545,6 +589,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnInitTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.initTransactions();
@@ -554,6 +599,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnSendIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.send(null);
@@ -563,6 +609,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnBeginTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.beginTransaction();
@@ -572,6 +619,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.sendOffsetsToTransaction(null, null);
@@ -581,6 +629,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCommitTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.commitTransaction();
@@ -590,6 +639,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnAbortTransactionIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.abortTransaction();
@@ -599,6 +649,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnCloseIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.close();
@@ -608,6 +659,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFenceProducerIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.fenceProducer();
@@ -617,6 +669,7 @@ public class MockProducerTest {
 
     @Test
     public void shouldThrowOnFlushProducerIfProducerIsClosed() {
+        buildMockProducer(true);
         producer.close();
         try {
             producer.flush();
@@ -626,25 +679,27 @@ public class MockProducerTest {
 
     @Test
     public void shouldBeFlushedIfNoBufferedRecords() {
+        buildMockProducer(true);
         assertTrue(producer.flushed());
     }
 
     @Test
     public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() {
+        buildMockProducer(true);
         producer.send(record1);
         assertTrue(producer.flushed());
     }
 
     @Test
     public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.send(record1);
         assertFalse(producer.flushed());
     }
 
     @Test
     public void shouldNotBeFlushedAfterFlush() {
-        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        buildMockProducer(false);
         producer.send(record1);
         producer.flush();
         assertTrue(producer.flushed());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0537a35..d587de4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -633,10 +633,9 @@ public class SenderTest {
         String topic = tp.topic();
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 
0.2f);
-        Metrics m = new Metrics();
-        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 
CompressionType.GZIP, 0L, 0L, m, time,
-                new ApiVersions(), txnManager);
-        try {
+        try (Metrics m = new Metrics()) {
+            accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 
CompressionType.GZIP, 0L, 0L, m, time,
+                    new ApiVersions(), txnManager);
             Sender sender = new Sender(client, metadata, this.accumulator, 
true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, time, REQUEST_TIMEOUT, 1000L, txnManager, new 
ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and 
partition 1 on broker 1
@@ -701,8 +700,6 @@ public class SenderTest {
 
             assertTrue("There should be a split",
                     m.metrics().get(m.metricName("batch-split-rate", 
"producer-metrics")).value() > 0);
-        } finally {
-            m.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
index fbac719..26ba3b8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.utils;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -49,6 +50,7 @@ public class ByteBufferOutputStreamTest {
         byte[] bytes = new byte[5];
         buffer.get(bytes);
         assertArrayEquals("hello".getBytes(), bytes);
+        output.close();
     }
 
     @Test
@@ -75,19 +77,20 @@ public class ByteBufferOutputStreamTest {
         byte[] bytes = new byte[5];
         buffer.get(bytes);
         assertArrayEquals("hello".getBytes(), bytes);
+        output.close();
     }
 
     @Test
-    public void testWriteByteBuffer() {
+    public void testWriteByteBuffer() throws IOException {
         testWriteByteBuffer(ByteBuffer.allocate(16));
     }
 
     @Test
-    public void testWriteDirectByteBuffer() {
+    public void testWriteDirectByteBuffer() throws IOException {
         testWriteByteBuffer(ByteBuffer.allocateDirect(16));
     }
 
-    private void testWriteByteBuffer(ByteBuffer input) {
+    private void testWriteByteBuffer(ByteBuffer input) throws IOException {
         long value = 234239230L;
         input.putLong(value);
         input.flip();
@@ -97,6 +100,7 @@ public class ByteBufferOutputStreamTest {
         assertEquals(8, input.position());
         assertEquals(8, output.position());
         assertEquals(value, output.buffer().getLong(0));
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
index eb91dbd..03fb774 100644
--- 
a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
+++ 
b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java
@@ -123,6 +123,7 @@ public class FileStreamSourceTaskTest {
         
assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, 
tempFile.getAbsolutePath()), records.get(0).sourcePartition());
         
assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 
48L), records.get(0).sourceOffset());
 
+        os.close();
         task.stop();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 654f485..f7a5553 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -137,7 +137,6 @@ public class Plugins {
         return delegatingLoader.transformations();
     }
 
-    @SuppressWarnings("unchecked")
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass;
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 32f5a38..0547fe6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -69,8 +69,7 @@ public class FileOffsetBackingStore extends 
MemoryOffsetBackingStore {
 
     @SuppressWarnings("unchecked")
     private void load() {
-        try {
-            ObjectInputStream is = new ObjectInputStream(new 
FileInputStream(file));
+        try (ObjectInputStream is = new ObjectInputStream(new 
FileInputStream(file))) {
             Object obj = is.readObject();
             if (!(obj instanceof HashMap))
                 throw new ConnectException("Expected HashMap but found " + 
obj.getClass());
@@ -81,7 +80,6 @@ public class FileOffsetBackingStore extends 
MemoryOffsetBackingStore {
                 ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
                 data.put(key, value);
             }
-            is.close();
         } catch (FileNotFoundException | EOFException e) {
             // FileNotFoundException: Ignore, may be new.
             // EOFException: Ignore, this means the file was missing or corrupt

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 29a6b52..6f77f65 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -115,7 +115,6 @@ public class WorkerSinkTaskThreadedTest extends 
ThreadedTest {
     private long recordsReturned;
 
 
-    @SuppressWarnings("unchecked")
     @Override
     public void setup() {
         super.setup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 07d192b..e9dd18e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -135,7 +135,6 @@ public class KafkaConfigBackingStoreTest {
     KafkaBasedLog<String, byte[]> storeLog;
     private KafkaConfigBackingStore configStorage;
 
-    private String internalTopic;
     private Capture<String> capturedTopic = EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
     private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
@@ -363,7 +362,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -403,7 +402,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -445,7 +444,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -494,7 +493,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -540,7 +539,7 @@ public class KafkaConfigBackingStoreTest {
                 // Connector after root update should make it through, task 
update shouldn't
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(5)),
                 new ConsumerRecord<>(TOPIC, 0, 6, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(6)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -594,7 +593,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)),
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(5)));
 
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -640,7 +639,7 @@ public class KafkaConfigBackingStoreTest {
             new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
             new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)),
             new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
@@ -691,7 +690,7 @@ public class KafkaConfigBackingStoreTest {
                 new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
                 new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)),
                 new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(5)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index bafbce8..f90b77f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -49,7 +49,6 @@ public class TopicAdminTest {
     @Test
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
-        boolean internal = false;
         Cluster cluster = createCluster(1);
         try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
             env.kafkaClient().setNode(cluster.controller());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index 88afafc..b190189 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -34,42 +35,44 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class CastTest {
+    private final Cast<SourceRecord> xformKey = new Cast.Key<>();
+    private final Cast<SourceRecord> xformValue = new Cast.Value<>();
+
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
 
     @Test(expected = ConfigException.class)
     public void testConfigEmpty() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, ""));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidSchemaType() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:faketype"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:faketype"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidTargetType() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:array"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:array"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidMap() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8:extra"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8:extra"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigMixWholeAndFieldTransformation() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8,int32"));
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"foo:int8,int32"));
     }
 
     @Test
     public void castWholeRecordKeyWithSchema() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0,
                 Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus"));
 
         assertEquals(Schema.Type.INT8, transformed.keySchema().type());
@@ -78,9 +81,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt8() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
@@ -89,9 +91,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt16() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int16"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT16, transformed.valueSchema().type());
@@ -100,9 +101,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -111,9 +111,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaInt64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.INT64, transformed.valueSchema().type());
@@ -122,9 +121,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.FLOAT32, transformed.valueSchema().type());
@@ -133,9 +131,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaFloat64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.FLOAT64, transformed.valueSchema().type());
@@ -144,9 +141,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanTrue() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -155,9 +151,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaBooleanFalse() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 0));
 
         assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type());
@@ -166,9 +161,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueWithSchemaString() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 Schema.INT32_SCHEMA, 42));
 
         assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
@@ -178,9 +172,8 @@ public class CastTest {
     @Test
     public void castWholeRecordDefaultValue() {
         // Validate default value in schema is correctly converted
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 SchemaBuilder.float32().defaultValue(-42.125f).build(), 
42.125f));
 
         assertEquals(Schema.Type.INT32, transformed.valueSchema().type());
@@ -190,9 +183,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordKeySchemaless() {
-        final Cast<SourceRecord> xform = new Cast.Key<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0,
                 null, 42, Schema.STRING_SCHEMA, "bogus"));
 
         assertNull(transformed.keySchema());
@@ -201,9 +193,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt8() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -212,9 +203,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt16() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int16"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -223,9 +213,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -234,9 +223,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessInt64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -245,9 +233,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat32() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float32"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -256,9 +243,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessFloat64() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"float64"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -267,9 +253,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanTrue() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -278,9 +263,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessBooleanFalse() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"boolean"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 0));
 
         assertNull(transformed.valueSchema());
@@ -289,9 +273,8 @@ public class CastTest {
 
     @Test
     public void castWholeRecordValueSchemalessString() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"string"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, 42));
 
         assertNull(transformed.valueSchema());
@@ -300,16 +283,14 @@ public class CastTest {
 
     @Test(expected = DataException.class)
     public void castWholeRecordValueSchemalessUnsupportedType() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, 
Collections.singletonList("foo")));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8"));
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 
Collections.singletonList("foo")));
     }
 
 
     @Test
     public void castFieldsWithSchema() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
 
         // Include an optional fields and fields with defaults to validate 
their values are passed through properly
         SchemaBuilder builder = SchemaBuilder.struct();
@@ -336,7 +317,7 @@ public class CastTest {
         recordValue.put("string", "42");
         // optional field intentionally omitted
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 supportedTypesSchema, recordValue));
 
         assertEquals((short) 8, ((Struct) transformed.value()).get("int8"));
@@ -356,8 +337,7 @@ public class CastTest {
     @SuppressWarnings("unchecked")
     @Test
     public void castFieldsSchemaless() {
-        final Cast<SourceRecord> xform = new Cast.Value<>();
-        xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
+        xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, 
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
         Map<String, Object> recordValue = new HashMap<>();
         recordValue.put("int8", (byte) 8);
         recordValue.put("int16", (short) 16);
@@ -367,7 +347,7 @@ public class CastTest {
         recordValue.put("float64", -64.);
         recordValue.put("boolean", true);
         recordValue.put("string", "42");
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0,
                 null, recordValue));
 
         assertNull(transformed.valueSchema());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
index b54a908..0b7ce96 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -28,10 +29,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class ExtractFieldTest {
+    private final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, 
Collections.singletonMap("magic", 42), null, null, 0);
@@ -43,7 +49,6 @@ public class ExtractFieldTest {
 
     @Test
     public void withSchema() {
-        final ExtractField<SinkRecord> xform = new ExtractField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final Schema keySchema = SchemaBuilder.struct().field("magic", 
Schema.INT32_SCHEMA).build();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
index 86851f3..d709054 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -36,25 +37,30 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FlattenTest {
+    private final Flatten<SourceRecord> xformKey = new Flatten.Key<>();
+    private final Flatten<SourceRecord> xformValue = new Flatten.Value<>();
+
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
 
     @Test(expected = DataException.class)
     public void topLevelStructRequired() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
-        xform.apply(new SourceRecord(null, null, "topic", 0, 
Schema.INT32_SCHEMA, 42));
+        xformValue.configure(Collections.<String, String>emptyMap());
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, 
Schema.INT32_SCHEMA, 42));
     }
 
     @Test(expected = DataException.class)
     public void topLevelMapRequired() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, 42));
+        xformValue.configure(Collections.<String, String>emptyMap());
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42));
     }
 
     @Test
     public void testNestedStruct() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("int8", Schema.INT8_SCHEMA);
@@ -93,7 +99,7 @@ public class FlattenTest {
         Struct twoLevelNestedStruct = new Struct(twoLevelNestedSchema);
         twoLevelNestedStruct.put("A", oneLevelNestedStruct);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
                 twoLevelNestedSchema, twoLevelNestedStruct));
 
@@ -113,8 +119,7 @@ public class FlattenTest {
 
     @Test
     public void testNestedMapWithDelimiter() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.singletonMap("delimiter", "#"));
+        xformValue.configure(Collections.singletonMap("delimiter", "#"));
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("int8", (byte) 8);
@@ -130,7 +135,7 @@ public class FlattenTest {
         Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", 
(Object) supportedTypes);
         Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", 
(Object) oneLevelNestedMap);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
                 null, twoLevelNestedMap));
 
@@ -151,8 +156,7 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldStruct() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct();
         builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
@@ -168,7 +172,7 @@ public class FlattenTest {
         Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
         oneLevelNestedStruct.put("B", supportedTypes);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
                 oneLevelNestedSchema, oneLevelNestedStruct));
 
@@ -179,15 +183,14 @@ public class FlattenTest {
 
     @Test
     public void testOptionalFieldMap() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         Map<String, Object> supportedTypes = new HashMap<>();
         supportedTypes.put("opt_int32", null);
 
         Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", 
(Object) supportedTypes);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null,
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null,
                 "topic", 0,
                 null, oneLevelNestedMap));
 
@@ -200,12 +203,11 @@ public class FlattenTest {
 
     @Test
     public void testKey() {
-        final Flatten<SourceRecord> xform = new Flatten.Key<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformKey.configure(Collections.<String, String>emptyMap());
 
         Map<String, Map<String, Integer>> key = Collections.singletonMap("A", 
Collections.singletonMap("B", 12));
         SourceRecord src = new SourceRecord(null, null, "topic", null, key, 
null, null);
-        SourceRecord transformed = xform.apply(src);
+        SourceRecord transformed = xformKey.apply(src);
 
         assertNull(transformed.keySchema());
         assertTrue(transformed.key() instanceof Map);
@@ -215,10 +217,9 @@ public class FlattenTest {
 
     @Test(expected = DataException.class)
     public void testUnsupportedTypeInMap() {
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
         Object value = Collections.singletonMap("foo", Arrays.asList("bar", 
"baz"));
-        xform.apply(new SourceRecord(null, null, "topic", 0, null, value));
+        xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 
value));
     }
 
     @Test
@@ -227,8 +228,7 @@ public class FlattenTest {
         // children should also be optional. Similarly, if the parent Struct 
has a default value, the default value for
         // the flattened field
 
-        final Flatten<SourceRecord> xform = new Flatten.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
 
         SchemaBuilder builder = SchemaBuilder.struct().optional();
         builder.field("req_field", Schema.STRING_SCHEMA);
@@ -240,7 +240,7 @@ public class FlattenTest {
         // Intentionally leave this entire value empty since it is optional
         Struct value = new Struct(schema);
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, schema, value));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, schema, value));
 
         assertNotNull(transformed);
         Schema transformedSchema = transformed.valueSchema();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
index 299aab3..1135b85 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -27,10 +28,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class HoistFieldTest {
+    private final HoistField<SinkRecord> xform = new HoistField.Key<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final HoistField<SinkRecord> xform = new HoistField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, null, 42, null, 
null, 0);
@@ -42,7 +48,6 @@ public class HoistFieldTest {
 
     @Test
     public void withSchema() {
-        final HoistField<SinkRecord> xform = new HoistField.Key<>();
         xform.configure(Collections.singletonMap("field", "magic"));
 
         final SinkRecord record = new SinkRecord("test", 0, 
Schema.INT32_SCHEMA, 42, null, null, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
index 4ce6ad4..a0a0975 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -32,14 +33,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
 public class InsertFieldTest {
+    private InsertField<SourceRecord> xform = new InsertField.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test(expected = DataException.class)
     public void topLevelStructRequired() {
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(Collections.singletonMap("topic.field", 
"topic_field"));
-        xform.apply(new SourceRecord(null, null,
-                "", 0,
-                Schema.INT32_SCHEMA, 42));
+        xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 
42));
     }
 
     @Test
@@ -51,7 +55,6 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(props);
 
         final Schema simpleStructSchema = 
SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", 
Schema.OPTIONAL_INT64_SCHEMA).build();
@@ -94,7 +97,6 @@ public class InsertFieldTest {
         props.put("static.field", "instance_id");
         props.put("static.value", "my-instance-id");
 
-        final InsertField<SourceRecord> xform = new InsertField.Value<>();
         xform.configure(props);
 
         final SourceRecord record = new SourceRecord(null, null, "test", 0,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
index cc001f1..aa47001 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java
@@ -32,8 +32,9 @@ public class RegexRouterTest {
         props.put("replacement", replacement);
         final RegexRouter<SinkRecord> router = new RegexRouter<>();
         router.configure(props);
-        return router.apply(new SinkRecord(topic, 0, null, null, null, null, 
0))
-                .topic();
+        String sinkTopic = router.apply(new SinkRecord(topic, 0, null, null, 
null, null, 0)).topic();
+        router.close();
+        return sinkTopic;
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
index e3d9d3a..6a1a13a 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -28,11 +29,15 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 
 public class ReplaceFieldTest {
+    private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
         final Map<String, String> props = new HashMap<>();
         props.put("blacklist", "dont");
         props.put("renames", "abc:xyz,foo:bar");
@@ -57,8 +62,6 @@ public class ReplaceFieldTest {
 
     @Test
     public void withSchema() {
-        final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
-
         final Map<String, String> props = new HashMap<>();
         props.put("whitelist", "abc,foo");
         props.put("renames", "abc:xyz,foo:bar");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
index 206c51e..257b382 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -31,10 +32,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertSame;
 
 public class SetSchemaMetadataTest {
+    private final SetSchemaMetadata<SinkRecord> xform = new 
SetSchemaMetadata.Value<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaNameUpdate() {
-        final SetSchemaMetadata<SinkRecord> xform = new 
SetSchemaMetadata.Value<>();
         xform.configure(Collections.singletonMap("schema.name", "foo"));
         final SinkRecord record = new SinkRecord("", 0, null, null, 
SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
@@ -43,7 +49,6 @@ public class SetSchemaMetadataTest {
 
     @Test
     public void schemaVersionUpdate() {
-        final SetSchemaMetadata<SinkRecord> xform = new 
SetSchemaMetadata.Value<>();
         xform.configure(Collections.singletonMap("schema.version", 42));
         final SinkRecord record = new SinkRecord("", 0, null, null, 
SchemaBuilder.struct().build(), null, 0);
         final SinkRecord updatedRecord = xform.apply(record);
@@ -56,7 +61,6 @@ public class SetSchemaMetadataTest {
         props.put("schema.name", "foo");
         props.put("schema.version", "42");
 
-        final SetSchemaMetadata<SinkRecord> xform = new 
SetSchemaMetadata.Value<>();
         xform.configure(props);
 
         final SinkRecord record = new SinkRecord("", 0, null, null, 
SchemaBuilder.struct().build(), null, 0);
@@ -83,7 +87,6 @@ public class SetSchemaMetadataTest {
         final Map<String, String> props = new HashMap<>();
         props.put("schema.name", "foo");
         props.put("schema.version", "42");
-        final SetSchemaMetadata<SinkRecord> xform = new 
SetSchemaMetadata.Value<>();
         xform.configure(props);
 
         final SinkRecord record = new SinkRecord("", 0, null, null, schema, 
value, 0);

Reply via email to