Repository: kafka Updated Branches: refs/heads/trunk 3bfc073f0 -> f87d58b79
http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java index 12beef8..475066f 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampConverterTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Time; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; import java.util.Calendar; @@ -38,7 +39,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class TimestampConverterTest { - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); private static final Calendar EPOCH; private static final Calendar TIME; @@ -48,6 +48,9 @@ public class TimestampConverterTest { private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z"; private static final String DATE_PLUS_TIME_STRING; + private final TimestampConverter<SourceRecord> xformKey = new TimestampConverter.Key<>(); + private final TimestampConverter<SourceRecord> xformValue = new TimestampConverter.Value<>(); + static { EPOCH = GregorianCalendar.getInstance(UTC); EPOCH.setTimeInMillis(0L); @@ -73,31 +76,33 @@ public class TimestampConverterTest { // Configuration + @After + public void teardown() { + xformKey.close(); + xformValue.close(); + } + @Test(expected = ConfigException.class) public void testConfigNoTargetType() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); } @Test(expected = ConfigException.class) public void testConfigInvalidTargetType() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")); } @Test(expected = ConfigException.class) public void testConfigMissingFormat() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string")); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string")); } @Test(expected = ConfigException.class) public void testConfigInvalidFormat() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_CONFIG, "bad-format"); - xform.configure(config); + xformValue.configure(config); } @@ -105,9 +110,8 @@ public class TimestampConverterTest { @Test public void testSchemalessIdentity() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -115,9 +119,8 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToDate() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE.getTime(), transformed.value()); @@ -125,9 +128,8 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToTime() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(TIME.getTime(), transformed.value()); @@ -135,9 +137,8 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToUnix() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); @@ -145,12 +146,11 @@ public class TimestampConverterTest { @Test public void testSchemalessTimestampToString() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); - xform.configure(config); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime())); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); @@ -161,9 +161,8 @@ public class TimestampConverterTest { @Test public void testSchemalessDateToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime())); assertNull(transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -172,9 +171,8 @@ public class TimestampConverterTest { @Test public void testSchemalessTimeToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime())); assertNull(transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -183,9 +181,8 @@ public class TimestampConverterTest { @Test public void testSchemalessUnixToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX)); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX)); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -193,12 +190,11 @@ public class TimestampConverterTest { @Test public void testSchemalessStringToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); - xform.configure(config); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING)); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING)); assertNull(transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -209,9 +205,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaIdentity() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -219,9 +214,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToDate() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Date.SCHEMA, transformed.valueSchema()); assertEquals(DATE.getTime(), transformed.value()); @@ -229,9 +223,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToTime() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Time.SCHEMA, transformed.valueSchema()); assertEquals(TIME.getTime(), transformed.value()); @@ -239,9 +232,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToUnix() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_UNIX, transformed.value()); @@ -249,12 +241,11 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimestampToString() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); - xform.configure(config); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime())); assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME_STRING, transformed.value()); @@ -265,9 +256,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaDateToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -276,9 +266,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaTimeToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime())); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime())); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); // No change expected since the source type is coarser-grained @@ -287,9 +276,8 @@ public class TimestampConverterTest { @Test public void testWithSchemaUnixToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX)); + xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX)); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -297,12 +285,11 @@ public class TimestampConverterTest { @Test public void testWithSchemaStringToTimestamp() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT); - xform.configure(config); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING)); + xformValue.configure(config); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING)); assertEquals(Timestamp.SCHEMA, transformed.valueSchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.value()); @@ -313,14 +300,13 @@ public class TimestampConverterTest { @Test public void testSchemalessFieldConversion() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); - xform.configure(config); + xformValue.configure(config); Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime()); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, value)); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value)); assertNull(transformed.valueSchema()); assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value()); @@ -328,11 +314,10 @@ public class TimestampConverterTest { @Test public void testWithSchemaFieldConversion() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Value<>(); Map<String, String> config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); - xform.configure(config); + xformValue.configure(config); // ts field is a unix timestamp Schema structWithTimestampFieldSchema = SchemaBuilder.struct() @@ -343,7 +328,7 @@ public class TimestampConverterTest { original.put("ts", DATE_PLUS_TIME_UNIX); original.put("other", "test"); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original)); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original)); Schema expectedSchema = SchemaBuilder.struct() .field("ts", Timestamp.SCHEMA) @@ -359,9 +344,8 @@ public class TimestampConverterTest { @Test public void testKey() { - TimestampConverter<SourceRecord> xform = new TimestampConverter.Key<>(); - xform.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null)); + xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp")); + SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null)); assertNull(transformed.keySchema()); assertEquals(DATE_PLUS_TIME.getTime(), transformed.key()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java index 595a71c..ba823ba 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/TimestampRouterTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -24,10 +25,15 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; public class TimestampRouterTest { + private final TimestampRouter<SourceRecord> xform = new TimestampRouter<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void defaultConfiguration() { - final TimestampRouter<SourceRecord> xform = new TimestampRouter<>(); xform.configure(Collections.<String, Object>emptyMap()); // defaults final SourceRecord record = new SourceRecord( null, null, http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java index 69fb026..e2dfa17 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -29,10 +30,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class ValueToKeyTest { + private final ValueToKey<SinkRecord> xform = new ValueToKey<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void schemaless() { - final ValueToKey<SinkRecord> xform = new ValueToKey<>(); xform.configure(Collections.singletonMap("fields", "a,b")); final HashMap<String, Integer> value = new HashMap<>(); @@ -53,7 +59,6 @@ public class ValueToKeyTest { @Test public void withSchema() { - final ValueToKey<SinkRecord> xform = new ValueToKey<>(); xform.configure(Collections.singletonMap("fields", "a,b")); final Schema valueSchema = SchemaBuilder.struct() http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index ca0e916..9637927 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -53,7 +53,6 @@ import org.apache.kafka.streams.processor.TopologyBuilder; * @see KGroupedStream * @see KStreamBuilder#stream(String...) */ -@SuppressWarnings("unused") @InterfaceStability.Evolving public interface KStream<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index b941f78..46769eb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -1039,7 +1039,6 @@ public class KStreamBuilder extends TopologyBuilder { * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()} * @return a {@link GlobalKTable} for the specified topic */ - @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final TimestampExtractor timestampExtractor, @@ -1083,14 +1082,13 @@ public class KStreamBuilder extends TopologyBuilder { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link GlobalKTable} for the specified topic */ - @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier); } - + /** * Create a {@link GlobalKTable} for the specified topic. * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. @@ -1121,7 +1119,6 @@ public class KStreamBuilder extends TopologyBuilder { * {@link KStreamBuilder#globalTable(Serde, Serde, String)} ()} * @return a {@link GlobalKTable} for the specified topic */ - @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, @@ -1198,7 +1195,6 @@ public class KStreamBuilder extends TopologyBuilder { * @param topic the topic name; cannot be {@code null} * @return a {@link GlobalKTable} for the specified topic */ - @SuppressWarnings("unchecked") public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 6ed3e84..a1b40a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -92,7 +92,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } - @SuppressWarnings("unchecked") @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows, @@ -101,7 +100,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return reduce(reducer, windows, windowedStore(keySerde, valSerde, windows, getOrCreateName(queryableStoreName, REDUCE_NAME))); } - @SuppressWarnings("unchecked") @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Windows<W> windows) { @@ -152,7 +150,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre storeSupplier); } - @SuppressWarnings("unchecked") @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -163,7 +160,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return aggregate(initializer, aggregator, windows, windowedStore(keySerde, aggValueSerde, windows, getOrCreateName(queryableStoreName, AGGREGATE_NAME))); } - @SuppressWarnings("unchecked") @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -266,7 +262,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } - @SuppressWarnings("unchecked") @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, @@ -309,7 +304,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return count(sessionWindows, (String) null); } - @SuppressWarnings("unchecked") @Override public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier) { @@ -350,7 +344,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre .sessionWindowed(sessionWindows.maintainMs()).build()); } - @SuppressWarnings("unchecked") @Override public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final SessionWindows sessionWindows) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index d30177c..1b26a5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -67,7 +67,6 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, this.valueGetter = valueGetter; } - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 8dc330d..c308a0d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -57,7 +57,6 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, this.valueGetter = valueGetter; } - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index 90a9f77..9cee4f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -56,7 +56,6 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, this.valueGetter = valueGetter; } - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index 05ecf40..b43efaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -57,7 +57,6 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, this.valueGetter = valueGetter; } - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 38beb63..4c2b40f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -49,7 +49,6 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { this.deserializationExceptionHandler = deserializationExceptionHandler; } - @SuppressWarnings("unchecked") public Map<TopicPartition, Long> initialize() { final Set<String> storeNames = stateMgr.initialize(processorContext); final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9dc5640..eb75b14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1135,7 +1135,6 @@ public class StreamThread extends Thread { streamsMetrics.removeAllSensors(); } - @SuppressWarnings("ThrowableNotThrown") private void shutdownTasksAndState(final boolean cleanRun) { log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", logPrefix, activeTasks.keySet(), standbyTasks.keySet(), http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 77fb58a..8607472 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -120,9 +120,8 @@ public class AssignmentInfo { public static AssignmentInfo decode(ByteBuffer data) { // ensure we are at the beginning of the ByteBuffer data.rewind(); - DataInputStream in = new DataInputStream(new ByteBufferInputStream(data)); - try { + try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { // Decode version int version = in.readInt(); if (version < 0 || version > CURRENT_VERSION) { @@ -156,7 +155,6 @@ public class AssignmentInfo { return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions); - } catch (IOException ex) { throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java index d43c613..2a54cb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java @@ -61,7 +61,6 @@ public final class StateSerdes<K, V> { * @param valueSerde the serde for values; cannot be null * @throws IllegalArgumentException if key or value serde is null */ - @SuppressWarnings("unchecked") public StateSerdes(final String topic, final Serde<K> keySerde, final Serde<V> valueSerde) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 6190b88..b7d41b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -53,7 +53,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im this.valueSerde = valueSerde; } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { underlying.init(context, root); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 9a4a97c..b786ce4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -63,7 +63,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); } - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, final StateStore root) { underlying.init(context, root); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java index 9a826c4..34fe8f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStore.java @@ -67,7 +67,6 @@ class ChangeLoggingSegmentedBytesStore extends WrappedStateStore.AbstractStateSt @Override - @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { bytesStore.init(context, root); changeLogger = new StoreChangeLogger<>( http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 7034592..4d93a9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -239,7 +239,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } } - @SuppressWarnings("unchecked") @Override public synchronized void put(K key, V value) { Objects.requireNonNull(key, "key cannot be null"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 82e5b23..75b7910 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -318,7 +318,7 @@ public class KStreamBuilderTest { final String topicName = "topic-1"; builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, topicName); - + assertTrue(builder.earliestResetTopicsPattern().matcher(topicName).matches()); assertFalse(builder.latestResetTopicsPattern().matcher(topicName).matches()); } @@ -373,7 +373,7 @@ public class KStreamBuilderTest { final String topic = "topic-5"; builder.stream(topicPattern); - + assertFalse(builder.latestResetTopicsPattern().matcher(topic).matches()); assertFalse(builder.earliestResetTopicsPattern().matcher(topic).matches()); @@ -401,7 +401,6 @@ public class KStreamBuilderTest { assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches()); } - @SuppressWarnings("unchecked") @Test public void kStreamTimestampExtractorShouldBeNull() throws Exception { builder.stream("topic"); @@ -409,7 +408,6 @@ public class KStreamBuilderTest { assertNull(processorTopology.source("topic").getTimestampExtractor()); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception { builder.stream(new MockTimestampExtractor(), null, null, "topic"); @@ -419,7 +417,6 @@ public class KStreamBuilderTest { } } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception { builder.stream(null, new MockTimestampExtractor(), null, null, "topic"); @@ -427,7 +424,6 @@ public class KStreamBuilderTest { assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorToTablePerSource() throws Exception { builder.table("topic", "store"); @@ -435,7 +431,6 @@ public class KStreamBuilderTest { assertNull(processorTopology.source("topic").getTimestampExtractor()); } - @SuppressWarnings("unchecked") @Test public void kTableTimestampExtractorShouldBeNull() throws Exception { builder.table("topic", "store"); @@ -443,7 +438,6 @@ public class KStreamBuilderTest { assertNull(processorTopology.source("topic").getTimestampExtractor()); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception { builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index c69cd70..0662944 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -47,7 +47,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@SuppressWarnings("unchecked") public class KStreamSessionWindowAggregateProcessorTest { private static final long GAP_MS = 5 * 60 * 1000L; @@ -84,7 +83,6 @@ public class KStreamSessionWindowAggregateProcessorTest { private MockProcessorContext context; - @SuppressWarnings("unchecked") @Before public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 316494d..98bb346 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -89,6 +89,8 @@ public class WindowedStreamPartitionerTest { assertEquals(expected, actual); } } + + defaultPartitioner.close(); } @Test @@ -113,6 +115,8 @@ public class WindowedStreamPartitionerTest { Serializer<?> inner1 = windowedSerializer1.innerSerializer(); assertNotNull("Inner serializer should be not null", inner1); assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer); + windowedSerializer.close(); + windowedSerializer1.close(); } @Test @@ -137,5 +141,7 @@ public class WindowedStreamPartitionerTest { Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner1); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer); + windowedDeserializer.close(); + windowedDeserializer1.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index c0ce9e7..bad193a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -78,7 +78,7 @@ public class TopologyBuilderTest { @Test public void shouldAddSourcePatternWithOffsetReset() { final TopologyBuilder builder = new TopologyBuilder(); - + final String earliestTopicPattern = "earliest.*Topic"; final String latestTopicPattern = "latest.*Topic"; @@ -107,7 +107,7 @@ public class TopologyBuilderTest { final TopologyBuilder builder = new TopologyBuilder(); final Serde<String> stringSerde = Serdes.String(); final Pattern expectedPattern = Pattern.compile("test-.*"); - + builder.addSource("source", stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*")); assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); @@ -143,7 +143,7 @@ public class TopologyBuilderTest { } - + @Test(expected = TopologyBuilderException.class) public void testAddSourceWithSameName() { final TopologyBuilder builder = new TopologyBuilder(); @@ -579,7 +579,6 @@ public class TopologyBuilderTest { assertEquals(2, properties.size()); } - @SuppressWarnings("unchecked") @Test public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -596,7 +595,6 @@ public class TopologyBuilderTest { assertEquals(1, properties.size()); } - @SuppressWarnings("unchecked") @Test public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -702,7 +700,6 @@ public class TopologyBuilderTest { } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -711,7 +708,6 @@ public class TopologyBuilderTest { assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -720,7 +716,6 @@ public class TopologyBuilderTest { assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -730,7 +725,6 @@ public class TopologyBuilderTest { assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -740,7 +734,6 @@ public class TopologyBuilderTest { assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); @@ -749,7 +742,6 @@ public class TopologyBuilderTest { assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); } - @SuppressWarnings("unchecked") @Test public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java index 56e2410..e6cca87 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/QuickUnionTest.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertNotEquals; public class QuickUnionTest { - @SuppressWarnings("unchecked") @Test public void testUnite() { QuickUnion<Long> qu = new QuickUnion<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index a358be5..25c3cbd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -177,7 +177,6 @@ public class StandbyTaskTest { } - @SuppressWarnings("unchecked") @Test public void testUpdate() throws Exception { StreamsConfig config = createConfig(baseDir); @@ -224,7 +223,6 @@ public class StandbyTaskTest { } - @SuppressWarnings("unchecked") @Test public void testUpdateKTable() throws Exception { consumer.assign(Utils.mkList(ktable)); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 17eb50a..a6d1179 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -131,7 +131,6 @@ public class StreamPartitionAssignorTest { partitionAssignor.configure(configurationMap); } - @SuppressWarnings("unchecked") @Test public void testSubscription() throws Exception { builder.addSource("source1", "topic1"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1a0bebe..a27fb62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -724,7 +724,6 @@ public class StreamTaskTest { }); } - @SuppressWarnings("unchecked") @Test public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception { task.close(true); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index b4598fd..c6d12c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -185,7 +185,6 @@ public class KeyValueStoreTestDriver<K, V> { final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer); final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") { - @SuppressWarnings("unchecked") @Override public <K1, V1> void send(final String topic, final K1 key, http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java index a77d4ac..ff7cdc3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSegmentedBytesStoreTest.java @@ -61,7 +61,6 @@ public class ChangeLoggingSegmentedBytesStoreTest { private final ChangeLoggingSegmentedBytesStore store = new ChangeLoggingSegmentedBytesStore(bytesStore); private final Map sent = new HashMap<>(); - @SuppressWarnings("unchecked") @Before public void setUp() throws Exception { context.setTime(0); @@ -74,7 +73,6 @@ public class ChangeLoggingSegmentedBytesStoreTest { store.close(); } - @SuppressWarnings("unchecked") @Test public void shouldLogPuts() throws Exception { final byte[] value1 = {0}; @@ -88,7 +86,6 @@ public class ChangeLoggingSegmentedBytesStoreTest { assertArrayEquals(value2, (byte[]) sent.get(key2)); } - @SuppressWarnings("unchecked") @Test public void shouldLogRemoves() throws Exception { final Bytes key1 = Bytes.wrap(new byte[]{0}); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 4054990..0fa5216 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -46,7 +46,6 @@ public class CompositeReadOnlyKeyValueStoreTest { private KeyValueStore<String, String> otherUnderlyingStore; - @SuppressWarnings("unchecked") @Before public void before() { final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); @@ -141,8 +140,6 @@ public class CompositeReadOnlyKeyValueStoreTest { } catch (UnsupportedOperationException e) { } } - - @SuppressWarnings("unchecked") @Test public void shouldFindValueForKeyWhenMultiStores() throws Exception { final KeyValueStore<String, String> cache = newStoreInstance(); @@ -167,7 +164,6 @@ public class CompositeReadOnlyKeyValueStoreTest { assertEquals(2, results.size()); } - @SuppressWarnings("unchecked") @Test public void shouldSupportRangeAcrossMultipleKVStores() throws Exception { final KeyValueStore<String, String> cache = newStoreInstance(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java index bc21a7a..4baecb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/DelegatingPeekingKeyValueIteratorTest.java @@ -43,6 +43,7 @@ public class DelegatingPeekingKeyValueIteratorTest { assertEquals("A", peekingIterator.peekNextKey()); assertEquals("A", peekingIterator.peekNextKey()); assertTrue(peekingIterator.hasNext()); + peekingIterator.close(); } @Test @@ -52,6 +53,7 @@ public class DelegatingPeekingKeyValueIteratorTest { assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext()); assertEquals(KeyValue.pair("A", "A"), peekingIterator.peekNext()); assertTrue(peekingIterator.hasNext()); + peekingIterator.close(); } @Test @@ -71,18 +73,21 @@ public class DelegatingPeekingKeyValueIteratorTest { index++; } assertEquals(kvs.length, index); + peekingIterator.close(); } @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndNextCalled() throws Exception { final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); peekingIterator.next(); + peekingIterator.close(); } @Test(expected = NoSuchElementException.class) public void shouldThrowNoSuchElementWhenNoMoreItemsLeftAndPeekNextCalled() throws Exception { final DelegatingPeekingKeyValueIterator<String, String> peekingIterator = new DelegatingPeekingKeyValueIterator<>(name, store.all()); peekingIterator.peekNextKey(); + peekingIterator.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index 6e0059f..89a4d63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -64,6 +64,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { values[index++] = value; assertArrayEquals(bytes[bytesIndex++], value); } + iterator.close(); } @@ -171,6 +172,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { assertArrayEquals(bytes[bytesIndex++], keys); iterator.next(); } + iterator.close(); } private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index fed39b7..2088fbe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -80,6 +80,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { assertArrayEquals(expected.value, next.value); assertEquals(expected.key, next.key); } + iterator.close(); } @Test @@ -98,6 +99,7 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(10L)); + iterator.close(); } @Test @@ -112,5 +114,6 @@ public class MergedSortedCacheWrappedWindowStoreIteratorTest { assertThat(iterator.peekNextKey(), equalTo(0L)); iterator.next(); assertThat(iterator.peekNextKey(), equalTo(10L)); + iterator.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java index a7e1aed..3e935cf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStoreTest.java @@ -46,7 +46,6 @@ public class MeteredSegmentedBytesStoreTest { private final Set<String> latencyRecorded = new HashSet<>(); private final Set<String> throughputRecorded = new HashSet<>(); - @SuppressWarnings("unchecked") @Before public void setUp() throws Exception { final Metrics metrics = new Metrics(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java index 237514e..ff7d234 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplierTest.java @@ -133,7 +133,6 @@ public class RocksDBKeyValueStoreSupplierTest { assertThat(store, is(instanceOf(MeteredKeyValueStore.class))); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenCached() throws Exception { store = createStore(false, true); @@ -142,7 +141,6 @@ public class RocksDBKeyValueStoreSupplierTest { assertFalse(metrics.metrics().isEmpty()); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenLogged() throws Exception { store = createStore(true, false); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java index 70f3708..97936fc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplierTest.java @@ -112,7 +112,6 @@ public class RocksDBSessionStoreSupplierTest { assertThat(store, is(instanceOf(RocksDBSessionStore.class))); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenCached() throws Exception { store = createStore(false, true); @@ -121,7 +120,6 @@ public class RocksDBSessionStoreSupplierTest { assertFalse(metrics.metrics().isEmpty()); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenLogged() throws Exception { store = createStore(true, false); @@ -130,7 +128,6 @@ public class RocksDBSessionStoreSupplierTest { assertFalse(metrics.metrics().isEmpty()); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception { store = createStore(false, false); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java index 77fe8ee..f177aa3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplierTest.java @@ -135,7 +135,6 @@ public class RocksDBWindowStoreSupplierTest { assertThat(store, is(instanceOf(RocksDBWindowStore.class))); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenCached() throws Exception { store = createStore(false, true, 3); @@ -144,7 +143,6 @@ public class RocksDBWindowStoreSupplierTest { assertFalse(metrics.metrics().isEmpty()); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenLogged() throws Exception { store = createStore(true, false, 3); @@ -153,7 +151,6 @@ public class RocksDBWindowStoreSupplierTest { assertFalse(metrics.metrics().isEmpty()); } - @SuppressWarnings("unchecked") @Test public void shouldHaveMeteredStoreWhenNotLoggedOrCached() throws Exception { store = createStore(false, false, 3); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 8a3a8ba..c2b03c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -39,7 +39,6 @@ public class StoreChangeLoggerTest { private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), new RecordCollectorImpl(null, "StoreChangeLoggerTest") { - @SuppressWarnings("unchecked") @Override public <K1, V1> void send(final String topic, final K1 key, @@ -71,7 +70,6 @@ public class StoreChangeLoggerTest { context.close(); } - @SuppressWarnings("unchecked") @Test public void testAddRemove() throws Exception { context.setTime(1); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 0af2594..c01e169 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -101,6 +101,7 @@ public class BrokerCompatibilityTest { System.out.println("close Kafka Streams"); + producer.close(); streams.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 11e1ae8..9193d1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil { // This main() is not used by the system test. It is intended to be used for local debugging. public static void main(String[] args) throws Exception { final String kafka = "localhost:9092"; - final String zookeeper = "localhost:2181"; final File stateDir = TestUtils.tempDirectory(); final int numKeys = 20; http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index b9288d7..a9537a7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -370,108 +370,109 @@ public class ClientCompatibilityTest { consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512); ClientCompatibilityTestDeserializer deserializer = new ClientCompatibilityTestDeserializer(testConfig.expectClusterId); - final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer); - final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic); - if (partitionInfos.size() < 1) - throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic); - final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); - final LinkedList<TopicPartition> topicPartitions = new LinkedList<>(); - for (PartitionInfo partitionInfo : partitionInfos) { - TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); - timestampsToSearch.put(topicPartition, prodTimeMs); - topicPartitions.add(topicPartition); - } - final OffsetsForTime offsetsForTime = new OffsetsForTime(); - tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported, - new Invoker() { - @Override - public void invoke() { - offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch); - } - }, - new ResultTester() { - @Override - public void test() { - log.info("offsetsForTime = {}", offsetsForTime.result); - } - }); - // Whether or not offsetsForTimes works, beginningOffsets and endOffsets - // should work. - consumer.beginningOffsets(timestampsToSearch.keySet()); - consumer.endOffsets(timestampsToSearch.keySet()); - - consumer.assign(topicPartitions); - consumer.seekToBeginning(topicPartitions); - final Iterator<byte[]> iter = new Iterator<byte[]>() { - private static final int TIMEOUT_MS = 10000; - private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null; - private byte[] next = null; - - private byte[] fetchNext() { - while (true) { - long curTime = Time.SYSTEM.milliseconds(); - if (curTime - prodTimeMs > TIMEOUT_MS) - throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms."); - if (recordIter == null) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(100); - recordIter = records.iterator(); + try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer)) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic); + if (partitionInfos.size() < 1) + throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic); + final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); + final LinkedList<TopicPartition> topicPartitions = new LinkedList<>(); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); + timestampsToSearch.put(topicPartition, prodTimeMs); + topicPartitions.add(topicPartition); + } + final OffsetsForTime offsetsForTime = new OffsetsForTime(); + tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported, + new Invoker() { + @Override + public void invoke() { + offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch); + } + }, + new ResultTester() { + @Override + public void test() { + log.info("offsetsForTime = {}", offsetsForTime.result); + } + }); + // Whether or not offsetsForTimes works, beginningOffsets and endOffsets + // should work. + consumer.beginningOffsets(timestampsToSearch.keySet()); + consumer.endOffsets(timestampsToSearch.keySet()); + + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); + final Iterator<byte[]> iter = new Iterator<byte[]>() { + private static final int TIMEOUT_MS = 10000; + private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = null; + private byte[] next = null; + + private byte[] fetchNext() { + while (true) { + long curTime = Time.SYSTEM.milliseconds(); + if (curTime - prodTimeMs > TIMEOUT_MS) + throw new RuntimeException("Timed out after " + TIMEOUT_MS + " ms."); + if (recordIter == null) { + ConsumerRecords<byte[], byte[]> records = consumer.poll(100); + recordIter = records.iterator(); + } + if (recordIter.hasNext()) + return recordIter.next().value(); + recordIter = null; } - if (recordIter.hasNext()) - return recordIter.next().value(); - recordIter = null; } - } - @Override - public boolean hasNext() { - if (next != null) - return true; - next = fetchNext(); - return next != null; - } + @Override + public boolean hasNext() { + if (next != null) + return true; + next = fetchNext(); + return next != null; + } - @Override - public byte[] next() { - if (!hasNext()) - throw new NoSuchElementException(); - byte[] cur = next; - next = null; - return cur; - } + @Override + public byte[] next() { + if (!hasNext()) + throw new NoSuchElementException(); + byte[] cur = next; + next = null; + return cur; + } - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - byte[] next = iter.next(); - try { - compareArrays(message1, next); - log.debug("Found first message..."); - } catch (RuntimeException e) { - throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " + - "running this program."); - } - try { - next = iter.next(); - if (testConfig.expectRecordTooLargeException) - throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " + - "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + byte[] next = iter.next(); try { - compareArrays(message2, next); + compareArrays(message1, next); + log.debug("Found first message..."); } catch (RuntimeException e) { - System.out.println("The second message in this topic was not ours. Please use a new " + - "topic when running this program."); - Exit.exit(1); + throw new RuntimeException("The first message in this topic was not ours. Please use a new topic when " + + "running this program."); + } + try { + next = iter.next(); + if (testConfig.expectRecordTooLargeException) { + throw new RuntimeException("Expected to get a RecordTooLargeException when reading a record " + + "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); + } + try { + compareArrays(message2, next); + } catch (RuntimeException e) { + System.out.println("The second message in this topic was not ours. Please use a new " + + "topic when running this program."); + Exit.exit(1); + } + } catch (RecordTooLargeException e) { + log.debug("Got RecordTooLargeException", e); + if (!testConfig.expectRecordTooLargeException) + throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " + + "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); } - } catch (RecordTooLargeException e) { - log.debug("Got RecordTooLargeException", e); - if (!testConfig.expectRecordTooLargeException) - throw new RuntimeException("Got an unexpected RecordTooLargeException when reading a record " + - "bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); + log.debug("Closing consumer."); } - log.debug("Closing consumer."); - consumer.close(); log.info("Closed consumer."); }