Repository: kafka
Updated Branches:
  refs/heads/trunk 3bfc073f0 -> f87d58b79


http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
index 12beef8..475066f 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.data.Time;
 import org.apache.kafka.connect.data.Timestamp;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Calendar;
@@ -38,7 +39,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class TimestampConverterTest {
-
     private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
     private static final Calendar EPOCH;
     private static final Calendar TIME;
@@ -48,6 +48,9 @@ public class TimestampConverterTest {
     private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
     private static final String DATE_PLUS_TIME_STRING;
 
+    private final TimestampConverter<SourceRecord> xformKey = new 
TimestampConverter.Key<>();
+    private final TimestampConverter<SourceRecord> xformValue = new 
TimestampConverter.Value<>();
+
     static {
         EPOCH = GregorianCalendar.getInstance(UTC);
         EPOCH.setTimeInMillis(0L);
@@ -73,31 +76,33 @@ public class TimestampConverterTest {
 
     // Configuration
 
+    @After
+    public void teardown() {
+        xformKey.close();
+        xformValue.close();
+    }
+
     @Test(expected = ConfigException.class)
     public void testConfigNoTargetType() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        xform.configure(Collections.<String, String>emptyMap());
+        xformValue.configure(Collections.<String, String>emptyMap());
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidTargetType() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"invalid"));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "invalid"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigMissingFormat() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"string"));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "string"));
     }
 
     @Test(expected = ConfigException.class)
     public void testConfigInvalidFormat() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
-        xform.configure(config);
+        xformValue.configure(config);
     }
 
 
@@ -105,9 +110,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessIdentity() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -115,9 +119,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToDate() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Date"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -125,9 +128,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToTime() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Time"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -135,9 +137,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToUnix() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"unix"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -145,12 +146,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimestampToString() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -161,9 +161,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessDateToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -172,9 +171,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessTimeToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, TIME.getTime()));
 
         assertNull(transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -183,9 +181,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessUnixToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME_UNIX));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -193,12 +190,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessStringToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME_STRING));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, DATE_PLUS_TIME_STRING));
 
         assertNull(transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -209,9 +205,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaIdentity() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -219,9 +214,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToDate() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Date"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Date"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Date.SCHEMA, transformed.valueSchema());
         assertEquals(DATE.getTime(), transformed.value());
@@ -229,9 +223,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToTime() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Time"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Time"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Time.SCHEMA, transformed.valueSchema());
         assertEquals(TIME.getTime(), transformed.value());
@@ -239,9 +232,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToUnix() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"unix"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "unix"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@@ -249,12 +241,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimestampToString() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
 
         assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@@ -265,9 +256,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaDateToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Date.SCHEMA, DATE.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Date.SCHEMA, DATE.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -276,9 +266,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaTimeToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Time.SCHEMA, TIME.getTime()));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Time.SCHEMA, TIME.getTime()));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         // No change expected since the source type is coarser-grained
@@ -287,9 +276,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaUnixToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
+        
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -297,12 +285,11 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaStringToTimestamp() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
-        xform.configure(config);
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
+        xformValue.configure(config);
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
 
         assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@@ -313,14 +300,13 @@ public class TimestampConverterTest {
 
     @Test
     public void testSchemalessFieldConversion() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
-        xform.configure(config);
+        xformValue.configure(config);
 
         Object value = Collections.singletonMap("ts", 
DATE_PLUS_TIME.getTime());
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, value));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, null, value));
 
         assertNull(transformed.valueSchema());
         assertEquals(Collections.singletonMap("ts", DATE.getTime()), 
transformed.value());
@@ -328,11 +314,10 @@ public class TimestampConverterTest {
 
     @Test
     public void testWithSchemaFieldConversion() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Value<>();
         Map<String, String> config = new HashMap<>();
         config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
         config.put(TimestampConverter.FIELD_CONFIG, "ts");
-        xform.configure(config);
+        xformValue.configure(config);
 
         // ts field is a unix timestamp
         Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
@@ -343,7 +328,7 @@ public class TimestampConverterTest {
         original.put("ts", DATE_PLUS_TIME_UNIX);
         original.put("other", "test");
 
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, structWithTimestampFieldSchema, original));
+        SourceRecord transformed = xformValue.apply(new SourceRecord(null, 
null, "topic", 0, structWithTimestampFieldSchema, original));
 
         Schema expectedSchema = SchemaBuilder.struct()
                 .field("ts", Timestamp.SCHEMA)
@@ -359,9 +344,8 @@ public class TimestampConverterTest {
 
     @Test
     public void testKey() {
-        TimestampConverter<SourceRecord> xform = new 
TimestampConverter.Key<>();
-        
xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, 
"Timestamp"));
-        SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
+        
xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG,
 "Timestamp"));
+        SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, 
"topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
 
         assertNull(transformed.keySchema());
         assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
index 595a71c..ba823ba 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.transforms;
 
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -24,10 +25,15 @@ import java.util.Collections;
 import static org.junit.Assert.assertEquals;
 
 public class TimestampRouterTest {
+    private final TimestampRouter<SourceRecord> xform = new 
TimestampRouter<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void defaultConfiguration() {
-        final TimestampRouter<SourceRecord> xform = new TimestampRouter<>();
         xform.configure(Collections.<String, Object>emptyMap()); // defaults
         final SourceRecord record = new SourceRecord(
                 null, null,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
----------------------------------------------------------------------
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
index 69fb026..e2dfa17 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
+import org.junit.After;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -29,10 +30,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
 public class ValueToKeyTest {
+    private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
+
+    @After
+    public void teardown() {
+        xform.close();
+    }
 
     @Test
     public void schemaless() {
-        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
         xform.configure(Collections.singletonMap("fields", "a,b"));
 
         final HashMap<String, Integer> value = new HashMap<>();
@@ -53,7 +59,6 @@ public class ValueToKeyTest {
 
     @Test
     public void withSchema() {
-        final ValueToKey<SinkRecord> xform = new ValueToKey<>();
         xform.configure(Collections.singletonMap("fields", "a,b"));
 
         final Schema valueSchema = SchemaBuilder.struct()

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index ca0e916..9637927 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -53,7 +53,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
  * @see KGroupedStream
  * @see KStreamBuilder#stream(String...)
  */
-@SuppressWarnings("unused")
 @InterfaceStability.Evolving
 public interface KStream<K, V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index b941f78..46769eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -1039,7 +1039,6 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param queryableStoreName the state store name; If {@code null} this is 
the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final TimestampExtractor 
timestampExtractor,
@@ -1083,14 +1082,13 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param storeSupplier user defined state store supplier. Cannot be 
{@code null}.
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
                                                  final 
StateStoreSupplier<KeyValueStore> storeSupplier) {
         return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
-    
+
     /**
      * Create a {@link GlobalKTable} for the specified topic.
      * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
@@ -1121,7 +1119,6 @@ public class KStreamBuilder extends TopologyBuilder {
      *                           {@link KStreamBuilder#globalTable(Serde, 
Serde, String)} ()}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic,
@@ -1198,7 +1195,6 @@ public class KStreamBuilder extends TopologyBuilder {
      * @param topic     the topic name; cannot be {@code null}
      * @return a {@link GlobalKTable} for the specified topic
      */
-    @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
                                                  final String topic) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 6ed3e84..a1b40a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -92,7 +92,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
     }
 
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> 
reducer,
                                                             final Windows<W> 
windows,
@@ -101,7 +100,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
         return reduce(reducer, windows, windowedStore(keySerde, valSerde, 
windows, getOrCreateName(queryableStoreName, REDUCE_NAME)));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> 
reducer,
                                                             final Windows<W> 
windows) {
@@ -152,7 +150,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
                 storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final 
Initializer<T> initializer,
                                                                   final 
Aggregator<? super K, ? super V, T> aggregator,
@@ -163,7 +160,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
         return aggregate(initializer, aggregator, windows, 
windowedStore(keySerde, aggValueSerde, windows, 
getOrCreateName(queryableStoreName, AGGREGATE_NAME)));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final 
Initializer<T> initializer,
                                                                   final 
Aggregator<? super K, ? super V, T> aggregator,
@@ -266,7 +262,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
 
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> 
initializer,
                                                 final Aggregator<? super K, ? 
super V, T> aggregator,
@@ -309,7 +304,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
         return count(sessionWindows, (String) null);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                            final 
StateStoreSupplier<SessionStore> storeSupplier) {
@@ -350,7 +344,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> 
implements KGroupedStre
                               
.sessionWindowed(sessionWindows.maintainMs()).build());
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                          final SessionWindows sessionWindows) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index d30177c..1b26a5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -67,7 +67,6 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 8dc330d..c308a0d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -57,7 +57,6 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 90a9f77..9cee4f3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -56,7 +56,6 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 05ecf40..b43efaa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -57,7 +57,6 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             this.valueGetter = valueGetter;
         }
 
-        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 38beb63..4c2b40f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -49,7 +49,6 @@ public class GlobalStateUpdateTask implements 
GlobalStateMaintainer {
         this.deserializationExceptionHandler = deserializationExceptionHandler;
     }
 
-    @SuppressWarnings("unchecked")
     public Map<TopicPartition, Long> initialize() {
         final Set<String> storeNames = stateMgr.initialize(processorContext);
         final Map<String, String> storeNameToTopic = 
topology.storeToChangelogTopic();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9dc5640..eb75b14 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1135,7 +1135,6 @@ public class StreamThread extends Thread {
         streamsMetrics.removeAllSensors();
     }
 
-    @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState(final boolean cleanRun) {
         log.debug("{} Shutting down all active tasks {}, standby tasks {}, 
suspended tasks {}, and suspended standby tasks {}",
             logPrefix, activeTasks.keySet(), standbyTasks.keySet(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 77fb58a..8607472 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -120,9 +120,8 @@ public class AssignmentInfo {
     public static AssignmentInfo decode(ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
-        DataInputStream in = new DataInputStream(new 
ByteBufferInputStream(data));
 
-        try {
+        try (DataInputStream in = new DataInputStream(new 
ByteBufferInputStream(data))) {
             // Decode version
             int version = in.readInt();
             if (version < 0 || version > CURRENT_VERSION) {
@@ -156,7 +155,6 @@ public class AssignmentInfo {
 
             return new AssignmentInfo(activeTasks, standbyTasks, 
hostStateToTopicPartitions);
 
-
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode 
AssignmentInfo", ex);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index d43c613..2a54cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -61,7 +61,6 @@ public final class StateSerdes<K, V> {
      * @param valueSerde    the serde for values; cannot be null
      * @throws IllegalArgumentException if key or value serde is null
      */
-    @SuppressWarnings("unchecked")
     public StateSerdes(final String topic,
                        final Serde<K> keySerde,
                        final Serde<V> valueSerde) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 6190b88..b7d41b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -53,7 +53,6 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
         this.valueSerde = valueSerde;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 9a4a97c..b786ce4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -63,7 +63,6 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         this.cacheFunction = new SegmentedCacheFunction(keySchema, 
segmentInterval);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         underlying.init(context, root);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
index 9a826c4..34fe8f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java
@@ -67,7 +67,6 @@ class ChangeLoggingSegmentedBytesStore extends 
WrappedStateStore.AbstractStateSt
 
 
     @Override
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         bytesStore.init(context, root);
         changeLogger = new StoreChangeLogger<>(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7034592..4d93a9a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -239,7 +239,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, 
V> {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public synchronized void put(K key, V value) {
         Objects.requireNonNull(key, "key cannot be null");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index 82e5b23..75b7910 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -318,7 +318,7 @@ public class KStreamBuilderTest {
         final String topicName = "topic-1";
         
         builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName);
-        
+
         
assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches());
         
assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches());
     }
@@ -373,7 +373,7 @@ public class KStreamBuilderTest {
         final String topic = "topic-5";
 
         builder.stream(topicPattern);
-        
+
         
assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches());
         
assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches());
 
@@ -401,7 +401,6 @@ public class KStreamBuilderTest {
         
assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void kStreamTimestampExtractorShouldBeNull() throws Exception {
         builder.stream("topic");
@@ -409,7 +408,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() 
throws Exception {
         builder.stream(new MockTimestampExtractor(), null, null, "topic");
@@ -419,7 +417,6 @@ public class KStreamBuilderTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() 
throws Exception {
         builder.stream(null, new MockTimestampExtractor(), null, null, 
"topic");
@@ -427,7 +424,6 @@ public class KStreamBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToTablePerSource() throws Exception 
{
         builder.table("topic", "store");
@@ -435,7 +431,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void kTableTimestampExtractorShouldBeNull() throws Exception {
         builder.table("topic", "store");
@@ -443,7 +438,6 @@ public class KStreamBuilderTest {
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() 
throws Exception {
         builder.table(null, new MockTimestampExtractor(), null, null, "topic", 
"store");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c69cd70..0662944 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings("unchecked")
 public class KStreamSessionWindowAggregateProcessorTest {
 
     private static final long GAP_MS = 5 * 60 * 1000L;
@@ -84,7 +83,6 @@ public class KStreamSessionWindowAggregateProcessorTest {
     private MockProcessorContext context;
 
 
-    @SuppressWarnings("unchecked")
     @Before
     public void initializeStore() {
         final File stateDir = TestUtils.tempDirectory();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 316494d..98bb346 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -89,6 +89,8 @@ public class WindowedStreamPartitionerTest {
                 assertEquals(expected, actual);
             }
         }
+
+        defaultPartitioner.close();
     }
 
     @Test
@@ -113,6 +115,8 @@ public class WindowedStreamPartitionerTest {
         Serializer<?> inner1 = windowedSerializer1.innerSerializer();
         assertNotNull("Inner serializer should be not null", inner1);
         assertTrue("Inner serializer type should be ByteArraySerializer", 
inner1 instanceof ByteArraySerializer);
+        windowedSerializer.close();
+        windowedSerializer1.close();
     }
 
     @Test
@@ -137,5 +141,7 @@ public class WindowedStreamPartitionerTest {
         Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
         assertNotNull("Inner deserializer should be not null", inner1);
         assertTrue("Inner deserializer type should be ByteArrayDeserializer", 
inner1 instanceof ByteArrayDeserializer);
+        windowedDeserializer.close();
+        windowedDeserializer1.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index c0ce9e7..bad193a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -78,7 +78,7 @@ public class TopologyBuilderTest {
     @Test
     public void shouldAddSourcePatternWithOffsetReset() {
         final TopologyBuilder builder = new TopologyBuilder();
-        
+
         final String earliestTopicPattern = "earliest.*Topic";
         final String latestTopicPattern = "latest.*Topic";
 
@@ -107,7 +107,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         final Serde<String> stringSerde = Serdes.String();
         final Pattern expectedPattern = Pattern.compile("test-.*");
-        
+
         builder.addSource("source", stringSerde.deserializer(), 
stringSerde.deserializer(), Pattern.compile("test-.*"));
 
         assertEquals(expectedPattern.pattern(), 
builder.sourceTopicPattern().pattern());
@@ -143,7 +143,7 @@ public class TopologyBuilderTest {
     }
 
 
-    
+
     @Test(expected = TopologyBuilderException.class)
     public void testAddSourceWithSameName() {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -579,7 +579,6 @@ public class TopologyBuilderTest {
         assertEquals(2, properties.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() 
throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -596,7 +595,6 @@ public class TopologyBuilderTest {
         assertEquals(1, properties.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws 
Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -702,7 +700,6 @@ public class TopologyBuilderTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorPerSource() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -711,7 +708,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws 
Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -720,7 +716,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldAddTimestampExtractorWithPatternPerSource() throws 
Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -730,7 +725,6 @@ public class TopologyBuilderTest {
         
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws 
Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -740,7 +734,6 @@ public class TopologyBuilderTest {
         
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws 
Exception {
         final TopologyBuilder builder = new TopologyBuilder();
@@ -749,7 +742,6 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source("topic").getTimestampExtractor(), 
instanceOf(MockTimestampExtractor.class));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() 
throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
index 56e2410..e6cca87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotEquals;
 
 public class QuickUnionTest {
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUnite() {
         QuickUnion<Long> qu = new QuickUnion<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index a358be5..25c3cbd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -177,7 +177,6 @@ public class StandbyTaskTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUpdate() throws Exception {
         StreamsConfig config = createConfig(baseDir);
@@ -224,7 +223,6 @@ public class StandbyTaskTest {
 
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testUpdateKTable() throws Exception {
         consumer.assign(Utils.mkList(ktable));

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17eb50a..a6d1179 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -131,7 +131,6 @@ public class StreamPartitionAssignorTest {
         partitionAssignor.configure(configurationMap);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testSubscription() throws Exception {
         builder.addSource("source1", "topic1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1a0bebe..a27fb62 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -724,7 +724,6 @@ public class StreamTaskTest {
         });
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
 throws Exception {
         task.close(true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b4598fd..c6d12c6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -185,7 +185,6 @@ public class KeyValueStoreTestDriver<K, V> {
         final Producer<byte[], byte[]> producer = new MockProducer<>(true, 
rawSerializer, rawSerializer);
 
         final RecordCollector recordCollector = new 
RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
-            @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
index a77d4ac..ff7cdc3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java
@@ -61,7 +61,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
     private final ChangeLoggingSegmentedBytesStore store = new 
ChangeLoggingSegmentedBytesStore(bytesStore);
     private final Map sent = new HashMap<>();
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() throws Exception {
         context.setTime(0);
@@ -74,7 +73,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
         store.close();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldLogPuts() throws Exception {
         final byte[] value1 = {0};
@@ -88,7 +86,6 @@ public class ChangeLoggingSegmentedBytesStoreTest {
         assertArrayEquals(value2, (byte[]) sent.get(key2));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldLogRemoves() throws Exception {
         final Bytes key1 = Bytes.wrap(new byte[]{0});

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 4054990..0fa5216 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -46,7 +46,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
     private KeyValueStore<String, String>
         otherUnderlyingStore;
 
-    @SuppressWarnings("unchecked")
     @Before
     public void before() {
         final StateStoreProviderStub stubProviderOne = new 
StateStoreProviderStub(false);
@@ -141,8 +140,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
         } catch (UnsupportedOperationException e) { }
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldFindValueForKeyWhenMultiStores() throws Exception {
         final KeyValueStore<String, String> cache = newStoreInstance();
@@ -167,7 +164,6 @@ public class CompositeReadOnlyKeyValueStoreTest {
         assertEquals(2, results.size());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldSupportRangeAcrossMultipleKVStores() throws Exception {
         final KeyValueStore<String, String> cache = newStoreInstance();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
index bc21a7a..4baecb1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java
@@ -43,6 +43,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
         assertEquals("A", peekingIterator.peekNextKey());
         assertEquals("A", peekingIterator.peekNextKey());
         assertTrue(peekingIterator.hasNext());
+        peekingIterator.close();
     }
 
     @Test
@@ -52,6 +53,7 @@ public class DelegatingPeekingKeyValueIteratorTest {
         assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
         assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext());
         assertTrue(peekingIterator.hasNext());
+        peekingIterator.close();
     }
 
     @Test
@@ -71,18 +73,21 @@ public class DelegatingPeekingKeyValueIteratorTest {
             index++;
         }
         assertEquals(kvs.length, index);
+        peekingIterator.close();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() 
throws Exception {
         final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.next();
+        peekingIterator.close();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() 
throws Exception {
         final DelegatingPeekingKeyValueIterator<String, String> 
peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all());
         peekingIterator.peekNextKey();
+        peekingIterator.close();
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 6e0059f..89a4d63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -64,6 +64,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
             values[index++] = value;
             assertArrayEquals(bytes[bytesIndex++], value);
         }
+        iterator.close();
     }
 
 
@@ -171,6 +172,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
             assertArrayEquals(bytes[bytesIndex++], keys);
             iterator.next();
         }
+        iterator.close();
     }
 
     private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> 
createIterator() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index fed39b7..2088fbe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -80,6 +80,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
             assertArrayEquals(expected.value, next.value);
             assertEquals(expected.key, next.key);
         }
+        iterator.close();
     }
 
     @Test
@@ -98,6 +99,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
+        iterator.close();
     }
 
     @Test
@@ -112,5 +114,6 @@ public class 
MergedSortedCacheWrappedWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(0L));
         iterator.next();
         assertThat(iterator.peekNextKey(), equalTo(10L));
+        iterator.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
index a7e1aed..3e935cf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java
@@ -46,7 +46,6 @@ public class MeteredSegmentedBytesStoreTest {
     private final Set<String> latencyRecorded = new HashSet<>();
     private final Set<String> throughputRecorded = new HashSet<>();
 
-    @SuppressWarnings("unchecked")
     @Before
     public void setUp() throws Exception {
         final Metrics metrics = new Metrics();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
index 237514e..ff7d234 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java
@@ -133,7 +133,6 @@ public class RocksDBKeyValueStoreSupplierTest {
         assertThat(store, is(instanceOf(MeteredKeyValueStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true);
@@ -142,7 +141,6 @@ public class RocksDBKeyValueStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
index 70f3708..97936fc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java
@@ -112,7 +112,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertThat(store, is(instanceOf(RocksDBSessionStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true);
@@ -121,7 +120,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false);
@@ -130,7 +128,6 @@ public class RocksDBSessionStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception 
{
         store = createStore(false, false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
index 77fe8ee..f177aa3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java
@@ -135,7 +135,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertThat(store, is(instanceOf(RocksDBWindowStore.class)));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenCached() throws Exception {
         store = createStore(false, true, 3);
@@ -144,7 +143,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenLogged() throws Exception {
         store = createStore(true, false, 3);
@@ -153,7 +151,6 @@ public class RocksDBWindowStoreSupplierTest {
         assertFalse(metrics.metrics().isEmpty());
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception 
{
         store = createStore(false, false, 3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 8a3a8ba..c2b03c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -39,7 +39,6 @@ public class StoreChangeLoggerTest {
 
     private final MockProcessorContext context = new 
MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, 
String.class),
             new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
-                @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(final String topic,
                                           final K1 key,
@@ -71,7 +70,6 @@ public class StoreChangeLoggerTest {
         context.close();
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testAddRemove() throws Exception {
         context.setTime(1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 0af2594..c01e169 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -101,6 +101,7 @@ public class BrokerCompatibilityTest {
 
 
         System.out.println("close Kafka Streams");
+        producer.close();
         streams.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 11e1ae8..9193d1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
     // This main() is not used by the system test. It is intended to be used 
for local debugging.
     public static void main(String[] args) throws Exception {
         final String kafka = "localhost:9092";
-        final String zookeeper = "localhost:2181";
         final File stateDir = TestUtils.tempDirectory();
 
         final int numKeys = 20;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index b9288d7..a9537a7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -370,108 +370,109 @@ public class ClientCompatibilityTest {
         consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
512);
         ClientCompatibilityTestDeserializer deserializer =
             new 
ClientCompatibilityTestDeserializer(testConfig.expectClusterId);
-        final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps, deserializer, deserializer);
-        final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(testConfig.topic);
-        if (partitionInfos.size() < 1)
-            throw new RuntimeException("Expected at least one partition for 
topic " + testConfig.topic);
-        final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
-        final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
-        for (PartitionInfo partitionInfo : partitionInfos) {
-            TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
-            timestampsToSearch.put(topicPartition, prodTimeMs);
-            topicPartitions.add(topicPartition);
-        }
-        final OffsetsForTime offsetsForTime = new OffsetsForTime();
-        tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
-                new Invoker() {
-                    @Override
-                    public void invoke() {
-                        offsetsForTime.result = 
consumer.offsetsForTimes(timestampsToSearch);
-                    }
-                },
-                new ResultTester() {
-                    @Override
-                    public void test() {
-                        log.info("offsetsForTime = {}", offsetsForTime.result);
-                    }
-                });
-        // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
-        // should work.
-        consumer.beginningOffsets(timestampsToSearch.keySet());
-        consumer.endOffsets(timestampsToSearch.keySet());
-
-        consumer.assign(topicPartitions);
-        consumer.seekToBeginning(topicPartitions);
-        final Iterator<byte[]> iter = new Iterator<byte[]>() {
-            private static final int TIMEOUT_MS = 10000;
-            private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null;
-            private byte[] next = null;
-
-            private byte[] fetchNext() {
-                while (true) {
-                    long curTime = Time.SYSTEM.milliseconds();
-                    if (curTime - prodTimeMs > TIMEOUT_MS)
-                        throw new RuntimeException("Timed out after " + 
TIMEOUT_MS + " ms.");
-                    if (recordIter == null) {
-                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(100);
-                        recordIter = records.iterator();
+        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps, deserializer, deserializer)) {
+            final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(testConfig.topic);
+            if (partitionInfos.size() < 1)
+                throw new RuntimeException("Expected at least one partition 
for topic " + testConfig.topic);
+            final Map<TopicPartition, Long> timestampsToSearch = new 
HashMap<>();
+            final LinkedList<TopicPartition> topicPartitions = new 
LinkedList<>();
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                TopicPartition topicPartition = new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+                timestampsToSearch.put(topicPartition, prodTimeMs);
+                topicPartitions.add(topicPartition);
+            }
+            final OffsetsForTime offsetsForTime = new OffsetsForTime();
+            tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
+                    new Invoker() {
+                        @Override
+                        public void invoke() {
+                            offsetsForTime.result = 
consumer.offsetsForTimes(timestampsToSearch);
+                        }
+                    },
+                    new ResultTester() {
+                        @Override
+                        public void test() {
+                            log.info("offsetsForTime = {}", 
offsetsForTime.result);
+                        }
+                    });
+            // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
+            // should work.
+            consumer.beginningOffsets(timestampsToSearch.keySet());
+            consumer.endOffsets(timestampsToSearch.keySet());
+
+            consumer.assign(topicPartitions);
+            consumer.seekToBeginning(topicPartitions);
+            final Iterator<byte[]> iter = new Iterator<byte[]>() {
+                private static final int TIMEOUT_MS = 10000;
+                private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
null;
+                private byte[] next = null;
+
+                private byte[] fetchNext() {
+                    while (true) {
+                        long curTime = Time.SYSTEM.milliseconds();
+                        if (curTime - prodTimeMs > TIMEOUT_MS)
+                            throw new RuntimeException("Timed out after " + 
TIMEOUT_MS + " ms.");
+                        if (recordIter == null) {
+                            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(100);
+                            recordIter = records.iterator();
+                        }
+                        if (recordIter.hasNext())
+                            return recordIter.next().value();
+                        recordIter = null;
                     }
-                    if (recordIter.hasNext())
-                        return recordIter.next().value();
-                    recordIter = null;
                 }
-            }
 
-            @Override
-            public boolean hasNext() {
-                if (next != null)
-                    return true;
-                next = fetchNext();
-                return next != null;
-            }
+                @Override
+                public boolean hasNext() {
+                    if (next != null)
+                        return true;
+                    next = fetchNext();
+                    return next != null;
+                }
 
-            @Override
-            public byte[] next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-                byte[] cur = next;
-                next = null;
-                return cur;
-            }
+                @Override
+                public byte[] next() {
+                    if (!hasNext())
+                        throw new NoSuchElementException();
+                    byte[] cur = next;
+                    next = null;
+                    return cur;
+                }
 
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-        byte[] next = iter.next();
-        try {
-            compareArrays(message1, next);
-            log.debug("Found first message...");
-        } catch (RuntimeException e) {
-            throw new RuntimeException("The first message in this topic was 
not ours. Please use a new topic when " +
-                    "running this program.");
-        }
-        try {
-            next = iter.next();
-            if (testConfig.expectRecordTooLargeException)
-                throw new RuntimeException("Expected to get a 
RecordTooLargeException when reading a record " +
-                        "bigger than " + 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+            byte[] next = iter.next();
             try {
-                compareArrays(message2, next);
+                compareArrays(message1, next);
+                log.debug("Found first message...");
             } catch (RuntimeException e) {
-                System.out.println("The second message in this topic was not 
ours. Please use a new " +
-                    "topic when running this program.");
-                Exit.exit(1);
+                throw new RuntimeException("The first message in this topic 
was not ours. Please use a new topic when " +
+                        "running this program.");
+            }
+            try {
+                next = iter.next();
+                if (testConfig.expectRecordTooLargeException) {
+                    throw new RuntimeException("Expected to get a 
RecordTooLargeException when reading a record " +
+                            "bigger than " + 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+                }
+                try {
+                    compareArrays(message2, next);
+                } catch (RuntimeException e) {
+                    System.out.println("The second message in this topic was 
not ours. Please use a new " +
+                        "topic when running this program.");
+                    Exit.exit(1);
+                }
+            } catch (RecordTooLargeException e) {
+                log.debug("Got RecordTooLargeException", e);
+                if (!testConfig.expectRecordTooLargeException)
+                    throw new RuntimeException("Got an unexpected 
RecordTooLargeException when reading a record " +
+                        "bigger than " + 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
             }
-        } catch (RecordTooLargeException e) {
-            log.debug("Got RecordTooLargeException", e);
-            if (!testConfig.expectRecordTooLargeException)
-                throw new RuntimeException("Got an unexpected 
RecordTooLargeException when reading a record " +
-                    "bigger than " + 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+            log.debug("Closing consumer.");
         }
-        log.debug("Closing consumer.");
-        consumer.close();
         log.info("Closed consumer.");
     }
 

Reply via email to