MINOR: Code Cleanup Clean up includes:
- Switching try-catch-finally blocks to try-with-resources when possible - Removing some seemingly unnecessary `SuppressWarnings` annotations - Resolving some Java warnings - Closing unclosed Closable objects - Removing unused code Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Balint Molnar <balintmolna...@gmail.com>, Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <matth...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3222 from vahidhashemian/minor/code_cleanup_1706 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f87d58b7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f87d58b7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f87d58b7 Branch: refs/heads/trunk Commit: f87d58b796977fdaefb089d17cb30b2071cd4485 Parents: 3bfc073 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Wed Jul 19 10:51:28 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Jul 19 10:51:28 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 3 +- .../producer/internals/TransactionManager.java | 1 - .../requests/OffsetsForLeaderEpochRequest.java | 4 +- .../clients/consumer/KafkaConsumerTest.java | 17 +- .../clients/producer/KafkaProducerTest.java | 33 ++-- .../clients/producer/MockProducerTest.java | 71 ++++++- .../clients/producer/internals/SenderTest.java | 9 +- .../utils/ByteBufferOutputStreamTest.java | 10 +- .../connect/file/FileStreamSourceTaskTest.java | 1 + .../connect/runtime/isolation/Plugins.java | 1 - .../connect/storage/FileOffsetBackingStore.java | 4 +- .../runtime/WorkerSinkTaskThreadedTest.java | 1 - .../storage/KafkaConfigBackingStoreTest.java | 17 +- .../kafka/connect/util/TopicAdminTest.java | 1 - .../kafka/connect/transforms/CastTest.java | 144 ++++++-------- .../connect/transforms/ExtractFieldTest.java | 9 +- .../kafka/connect/transforms/FlattenTest.java | 54 +++--- .../connect/transforms/HoistFieldTest.java | 9 +- .../connect/transforms/InsertFieldTest.java | 14 +- .../connect/transforms/RegexRouterTest.java | 5 +- .../connect/transforms/ReplaceFieldTest.java | 11 +- .../transforms/SetSchemaMetadataTest.java | 11 +- .../transforms/TimestampConverterTest.java | 128 ++++++------- .../connect/transforms/TimestampRouterTest.java | 8 +- .../connect/transforms/ValueToKeyTest.java | 9 +- .../apache/kafka/streams/kstream/KStream.java | 1 - .../kafka/streams/kstream/KStreamBuilder.java | 6 +- .../kstream/internals/KGroupedStreamImpl.java | 7 - .../kstream/internals/KTableKTableJoin.java | 1 - .../kstream/internals/KTableKTableLeftJoin.java | 1 - .../internals/KTableKTableOuterJoin.java | 1 - .../internals/KTableKTableRightJoin.java | 1 - .../internals/GlobalStateUpdateTask.java | 1 - .../processor/internals/StreamThread.java | 1 - .../internals/assignment/AssignmentInfo.java | 4 +- .../apache/kafka/streams/state/StateSerdes.java | 1 - .../state/internals/CachingKeyValueStore.java | 1 - .../state/internals/CachingWindowStore.java | 1 - .../ChangeLoggingSegmentedBytesStore.java | 1 - .../streams/state/internals/RocksDBStore.java | 1 - .../streams/kstream/KStreamBuilderTest.java | 10 +- ...reamSessionWindowAggregateProcessorTest.java | 2 - .../WindowedStreamPartitionerTest.java | 6 + .../streams/processor/TopologyBuilderTest.java | 14 +- .../processor/internals/QuickUnionTest.java | 1 - .../processor/internals/StandbyTaskTest.java | 2 - .../internals/StreamPartitionAssignorTest.java | 1 - .../processor/internals/StreamTaskTest.java | 1 - .../streams/state/KeyValueStoreTestDriver.java | 1 - .../ChangeLoggingSegmentedBytesStoreTest.java | 3 - .../CompositeReadOnlyKeyValueStoreTest.java | 4 - .../DelegatingPeekingKeyValueIteratorTest.java | 5 + ...gedSortedCacheKeyValueStoreIteratorTest.java | 2 + ...rtedCacheWrappedWindowStoreIteratorTest.java | 3 + .../MeteredSegmentedBytesStoreTest.java | 1 - .../RocksDBKeyValueStoreSupplierTest.java | 2 - .../RocksDBSessionStoreSupplierTest.java | 3 - .../RocksDBWindowStoreSupplierTest.java | 3 - .../state/internals/StoreChangeLoggerTest.java | 2 - .../streams/tests/BrokerCompatibilityTest.java | 1 + .../kafka/streams/tests/SmokeTestDriver.java | 1 - .../kafka/tools/ClientCompatibilityTest.java | 187 ++++++++++--------- 62 files changed, 419 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c54b739..5f854d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -302,7 +302,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { keySerializer, valueSerializer); } - @SuppressWarnings({"unchecked", "deprecation"}) + @SuppressWarnings("unchecked") private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { log.trace("Starting the Kafka producer"); @@ -339,7 +339,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = ensureExtended(valueSerializer); } - // load interceptors and make sure they get clientId userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 8cee794..c5542e4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -628,7 +628,6 @@ public class TransactionManager { } @Override - @SuppressWarnings("unchecked") public void onComplete(ClientResponse response) { if (response.requestHeader().correlationId() != inFlightRequestCorrelationId) { fatalError(new RuntimeException("Detected more than one in-flight transactional request.")); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index f898a75..fc31d75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -42,7 +42,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { } public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> { - private Map<TopicPartition, Integer> epochsByPartition = new HashMap(); + private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>(); public Builder() { super(ApiKeys.OFFSET_FOR_LEADER_EPOCH); @@ -129,7 +129,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); - Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap(); + Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>(); for (TopicPartition tp : epochsByPartition.keySet()) { errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 219c3f6..9fd7e19 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -129,13 +129,12 @@ public class KafkaConsumerTest { final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); try { new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + Assert.fail("should have caught an exception and returned"); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); assertEquals("Failed to construct kafka consumer", e.getMessage()); - return; } - Assert.fail("should have caught an exception and returned"); } @Test @@ -1191,23 +1190,17 @@ public class KafkaConsumerTest { @Test(expected = IllegalStateException.class) public void testPollWithEmptySubscription() { - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); - consumer.subscribe(Collections.<String>emptyList()); - try { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + consumer.subscribe(Collections.<String>emptyList()); consumer.poll(0); - } finally { - consumer.close(); } } @Test(expected = IllegalStateException.class) public void testPollWithEmptyUserAssignment() { - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); - consumer.assign(Collections.<TopicPartition>emptySet()); - try { + try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) { + consumer.assign(Collections.<TopicPartition>emptySet()); consumer.poll(0); - } finally { - consumer.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 56c1b18..dd62457 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -87,16 +87,13 @@ public class KafkaProducerTest { final int oldInitCount = MockMetricsReporter.INIT_COUNT.get(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); - try { - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>( - props, new ByteArraySerializer(), new ByteArraySerializer()); + try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) { + fail("should have caught an exception and returned"); } catch (KafkaException e) { assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get()); assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get()); assertEquals("Failed to construct kafka producer", e.getMessage()); - return; } - fail("should have caught an exception and returned"); } @Test @@ -172,9 +169,7 @@ public class KafkaProducerTest { config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>( - config, new ByteArraySerializer(), new ByteArraySerializer()); - producer.close(); + new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()).close(); } @Test(expected = KafkaException.class) @@ -355,7 +350,7 @@ public class KafkaProducerTest { } Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic(topic)); } - + @PrepareOnlyThisForTest(Metadata.class) @Test public void testHeaders() throws Exception { @@ -369,8 +364,6 @@ public class KafkaProducerTest { MemberModifier.field(KafkaProducer.class, "metadata").set(producer, metadata); String topic = "topic"; - Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000)); - final Cluster cluster = new Cluster( "dummy", Collections.singletonList(new Node(0, "host1", 1000)), @@ -395,9 +388,9 @@ public class KafkaProducerTest { //ensure headers can be mutated pre send. record.headers().add(new RecordHeader("test", "header2".getBytes())); - + producer.send(record, null); - + //ensure headers are closed and cannot be mutated post send try { record.headers().add(new RecordHeader("test", "test".getBytes())); @@ -405,7 +398,7 @@ public class KafkaProducerTest { } catch (IllegalStateException ise) { //expected } - + //ensure existing headers are not changed, and last header for key is still original value assertTrue(Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes())); @@ -436,7 +429,7 @@ public class KafkaProducerTest { assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel()); } } - + @PrepareOnlyThisForTest(Metadata.class) @Test public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception { @@ -445,7 +438,7 @@ public class KafkaProducerTest { props.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1"); String topic = "topic"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value"); - + KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); Metadata metadata = PowerMock.createNiceMock(Metadata.class); @@ -457,20 +450,18 @@ public class KafkaProducerTest { Collections.<String>emptySet(), Collections.<String>emptySet()); EasyMock.expect(metadata.fetch()).andReturn(cluster).once(); - + // Mock interceptors field ProducerInterceptors interceptors = PowerMock.createMock(ProducerInterceptors.class); EasyMock.expect(interceptors.onSend(record)).andReturn(record); interceptors.onSendError(EasyMock.eq(record), EasyMock.<TopicPartition>notNull(), EasyMock.<Exception>notNull()); EasyMock.expectLastCall(); MemberModifier.field(KafkaProducer.class, "interceptors").set(producer, interceptors); - + PowerMock.replay(metadata); EasyMock.replay(interceptors); producer.send(record); - + EasyMock.verify(interceptors); - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index eeb9b5f..d343194 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.test.MockSerializer; +import org.junit.After; import org.junit.Test; import java.util.ArrayList; @@ -47,14 +48,23 @@ import static org.junit.Assert.fail; public class MockProducerTest { private final String topic = "topic"; - private final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new MockSerializer(), new MockSerializer()); + private MockProducer<byte[], byte[]> producer; private final ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, "key1".getBytes(), "value1".getBytes()); private final ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, "key2".getBytes(), "value2".getBytes()); + private void buildMockProducer(boolean autoComplete) { + this.producer = new MockProducer<>(autoComplete, new MockSerializer(), new MockSerializer()); + } + + @After + public void cleanup() { + if (this.producer != null && !this.producer.closed()) + this.producer.close(); + } @Test - @SuppressWarnings("unchecked") public void testAutoCompleteMock() throws Exception { + buildMockProducer(true); Future<RecordMetadata> metadata = producer.send(record1); assertTrue("Send should be immediately complete", metadata.isDone()); assertFalse("Send should be successful", isError(metadata)); @@ -77,11 +87,12 @@ public class MockProducerTest { assertEquals("Partition should be correct", 1, metadata.get().partition()); producer.clear(); assertEquals("Clear should erase our history", 0, producer.history().size()); + producer.close(); } @Test public void testManualCompletion() throws Exception { - MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + buildMockProducer(false); Future<RecordMetadata> md1 = producer.send(record1); assertFalse("Send shouldn't have completed", md1.isDone()); Future<RecordMetadata> md2 = producer.send(record2); @@ -98,7 +109,7 @@ public class MockProducerTest { assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); - + Future<RecordMetadata> md3 = producer.send(record1); Future<RecordMetadata> md4 = producer.send(record2); assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone()); @@ -108,12 +119,14 @@ public class MockProducerTest { @Test public void shouldInitTransactions() { + buildMockProducer(true); producer.initTransactions(); assertTrue(producer.transactionInitialized()); } @Test public void shouldThrowOnInitTransactionIfProducerAlreadyInitializedForTransactions() { + buildMockProducer(true); producer.initTransactions(); try { producer.initTransactions(); @@ -123,11 +136,13 @@ public class MockProducerTest { @Test(expected = IllegalStateException.class) public void shouldThrowOnBeginTransactionIfTransactionsNotInitialized() { + buildMockProducer(true); producer.beginTransaction(); } @Test public void shouldBeginTransactions() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); assertTrue(producer.transactionInFlight()); @@ -135,11 +150,13 @@ public class MockProducerTest { @Test(expected = IllegalStateException.class) public void shouldThrowOnSendOffsetsToTransactionIfTransactionsNotInitialized() { + buildMockProducer(true); producer.sendOffsetsToTransaction(null, null); } @Test public void shouldThrowOnSendOffsetsToTransactionTransactionIfNoTransactionGotStarted() { + buildMockProducer(true); producer.initTransactions(); try { producer.sendOffsetsToTransaction(null, null); @@ -149,11 +166,13 @@ public class MockProducerTest { @Test(expected = IllegalStateException.class) public void shouldThrowOnCommitIfTransactionsNotInitialized() { + buildMockProducer(true); producer.commitTransaction(); } @Test public void shouldThrowOnCommitTransactionIfNoTransactionGotStarted() { + buildMockProducer(true); producer.initTransactions(); try { producer.commitTransaction(); @@ -163,6 +182,7 @@ public class MockProducerTest { @Test public void shouldCommitEmptyTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); producer.commitTransaction(); @@ -173,6 +193,7 @@ public class MockProducerTest { @Test public void shouldCountCommittedTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -183,6 +204,7 @@ public class MockProducerTest { @Test public void shouldNotCountAbortedTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -195,11 +217,13 @@ public class MockProducerTest { @Test(expected = IllegalStateException.class) public void shouldThrowOnAbortIfTransactionsNotInitialized() { + buildMockProducer(true); producer.abortTransaction(); } @Test public void shouldThrowOnAbortTransactionIfNoTransactionGotStarted() { + buildMockProducer(true); producer.initTransactions(); try { producer.abortTransaction(); @@ -209,6 +233,7 @@ public class MockProducerTest { @Test public void shouldAbortEmptyTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); producer.abortTransaction(); @@ -219,6 +244,7 @@ public class MockProducerTest { @Test public void shouldAbortInFlightTransactionOnClose() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); producer.close(); @@ -229,11 +255,13 @@ public class MockProducerTest { @Test(expected = IllegalStateException.class) public void shouldThrowFenceProducerIfTransactionsNotInitialized() { + buildMockProducer(true); producer.fenceProducer(); } @Test public void shouldThrowOnBeginTransactionsIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -244,6 +272,7 @@ public class MockProducerTest { @Test public void shouldThrowOnSendIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -254,6 +283,7 @@ public class MockProducerTest { @Test public void shouldThrowOnFlushIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -264,6 +294,7 @@ public class MockProducerTest { @Test public void shouldThrowOnSendOffsetsToTransactionIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -274,6 +305,7 @@ public class MockProducerTest { @Test public void shouldThrowOnCommitTransactionIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -284,6 +316,7 @@ public class MockProducerTest { @Test public void shouldThrowOnAbortTransactionIfProducerGotFenced() { + buildMockProducer(true); producer.initTransactions(); producer.fenceProducer(); try { @@ -294,6 +327,7 @@ public class MockProducerTest { @Test public void shouldPublishMessagesOnlyAfterCommitIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -313,7 +347,7 @@ public class MockProducerTest { @Test public void shouldFlushOnCommitForNonAutoCompleteIfTransactionsAreEnabled() { - MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + buildMockProducer(false); producer.initTransactions(); producer.beginTransaction(); @@ -331,6 +365,7 @@ public class MockProducerTest { @Test public void shouldDropMessagesOnAbortIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -346,7 +381,7 @@ public class MockProducerTest { @Test public void shouldThrowOnAbortForNonAutoCompleteIfTransactionsAreEnabled() throws Exception { - MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + buildMockProducer(false); producer.initTransactions(); producer.beginTransaction(); @@ -359,6 +394,7 @@ public class MockProducerTest { @Test public void shouldPreserveCommittedMessagesOnAbortIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -378,6 +414,7 @@ public class MockProducerTest { @Test public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -410,6 +447,7 @@ public class MockProducerTest { @Test public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -421,6 +459,7 @@ public class MockProducerTest { @Test public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), "groupId"); @@ -429,6 +468,7 @@ public class MockProducerTest { @Test public void shouldAddOffsetsWhenSendOffsetsToTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -445,6 +485,7 @@ public class MockProducerTest { @Test public void shouldResetSentOffsetsFlagOnlyWhenBeginningNewTransaction() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -465,6 +506,7 @@ public class MockProducerTest { @Test public void shouldPublishLatestAndCumulativeConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -501,6 +543,7 @@ public class MockProducerTest { @Test public void shouldDropConsumerGroupOffsetsOnAbortIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -521,6 +564,7 @@ public class MockProducerTest { @Test public void shouldPreserveCommittedConsumerGroupsOffsetsOnAbortIfTransactionsAreEnabled() { + buildMockProducer(true); producer.initTransactions(); producer.beginTransaction(); @@ -545,6 +589,7 @@ public class MockProducerTest { @Test public void shouldThrowOnInitTransactionIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.initTransactions(); @@ -554,6 +599,7 @@ public class MockProducerTest { @Test public void shouldThrowOnSendIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.send(null); @@ -563,6 +609,7 @@ public class MockProducerTest { @Test public void shouldThrowOnBeginTransactionIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.beginTransaction(); @@ -572,6 +619,7 @@ public class MockProducerTest { @Test public void shouldThrowSendOffsetsToTransactionIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.sendOffsetsToTransaction(null, null); @@ -581,6 +629,7 @@ public class MockProducerTest { @Test public void shouldThrowOnCommitTransactionIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.commitTransaction(); @@ -590,6 +639,7 @@ public class MockProducerTest { @Test public void shouldThrowOnAbortTransactionIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.abortTransaction(); @@ -599,6 +649,7 @@ public class MockProducerTest { @Test public void shouldThrowOnCloseIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.close(); @@ -608,6 +659,7 @@ public class MockProducerTest { @Test public void shouldThrowOnFenceProducerIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.fenceProducer(); @@ -617,6 +669,7 @@ public class MockProducerTest { @Test public void shouldThrowOnFlushProducerIfProducerIsClosed() { + buildMockProducer(true); producer.close(); try { producer.flush(); @@ -626,25 +679,27 @@ public class MockProducerTest { @Test public void shouldBeFlushedIfNoBufferedRecords() { + buildMockProducer(true); assertTrue(producer.flushed()); } @Test public void shouldBeFlushedWithAutoCompleteIfBufferedRecords() { + buildMockProducer(true); producer.send(record1); assertTrue(producer.flushed()); } @Test public void shouldNotBeFlushedWithNoAutoCompleteIfBufferedRecords() { - MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + buildMockProducer(false); producer.send(record1); assertFalse(producer.flushed()); } @Test public void shouldNotBeFlushedAfterFlush() { - MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new MockSerializer(), new MockSerializer()); + buildMockProducer(false); producer.send(record1); producer.flush(); assertTrue(producer.flushed()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 0537a35..d587de4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -633,10 +633,9 @@ public class SenderTest { String topic = tp.topic(); // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); - Metrics m = new Metrics(); - accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, - new ApiVersions(), txnManager); - try { + try (Metrics m = new Metrics()) { + accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, + new ApiVersions(), txnManager); Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 @@ -701,8 +700,6 @@ public class SenderTest { assertTrue("There should be a split", m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0); - } finally { - m.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java index fbac719..26ba3b8 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteBufferOutputStreamTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.utils; import org.junit.Test; +import java.io.IOException; import java.nio.ByteBuffer; import static org.junit.Assert.assertArrayEquals; @@ -49,6 +50,7 @@ public class ByteBufferOutputStreamTest { byte[] bytes = new byte[5]; buffer.get(bytes); assertArrayEquals("hello".getBytes(), bytes); + output.close(); } @Test @@ -75,19 +77,20 @@ public class ByteBufferOutputStreamTest { byte[] bytes = new byte[5]; buffer.get(bytes); assertArrayEquals("hello".getBytes(), bytes); + output.close(); } @Test - public void testWriteByteBuffer() { + public void testWriteByteBuffer() throws IOException { testWriteByteBuffer(ByteBuffer.allocate(16)); } @Test - public void testWriteDirectByteBuffer() { + public void testWriteDirectByteBuffer() throws IOException { testWriteByteBuffer(ByteBuffer.allocateDirect(16)); } - private void testWriteByteBuffer(ByteBuffer input) { + private void testWriteByteBuffer(ByteBuffer input) throws IOException { long value = 234239230L; input.putLong(value); input.flip(); @@ -97,6 +100,7 @@ public class ByteBufferOutputStreamTest { assertEquals(8, input.position()); assertEquals(8, output.position()); assertEquals(value, output.buffer().getLong(0)); + output.close(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java index eb91dbd..03fb774 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -123,6 +123,7 @@ public class FileStreamSourceTaskTest { assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset()); + os.close(); task.stop(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 654f485..f7a5553 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -137,7 +137,6 @@ public class Plugins { return delegatingLoader.transformations(); } - @SuppressWarnings("unchecked") public Connector newConnector(String connectorClassOrAlias) { Class<? extends Connector> klass; try { http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java index 32f5a38..0547fe6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java @@ -69,8 +69,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore { @SuppressWarnings("unchecked") private void load() { - try { - ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); + try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(file))) { Object obj = is.readObject(); if (!(obj instanceof HashMap)) throw new ConnectException("Expected HashMap but found " + obj.getClass()); @@ -81,7 +80,6 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore { ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; data.put(key, value); } - is.close(); } catch (FileNotFoundException | EOFException e) { // FileNotFoundException: Ignore, may be new. // EOFException: Ignore, this means the file was missing or corrupt http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 29a6b52..6f77f65 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -115,7 +115,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private long recordsReturned; - @SuppressWarnings("unchecked") @Override public void setup() { super.setup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 07d192b..e9dd18e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -135,7 +135,6 @@ public class KafkaConfigBackingStoreTest { KafkaBasedLog<String, byte[]> storeLog; private KafkaConfigBackingStore configStorage; - private String internalTopic; private Capture<String> capturedTopic = EasyMock.newCapture(); private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); @@ -363,7 +362,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -403,7 +402,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -445,7 +444,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -494,7 +493,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -540,7 +539,7 @@ public class KafkaConfigBackingStoreTest { // Connector after root update should make it through, task update shouldn't new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -594,7 +593,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -640,7 +639,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)), new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); @@ -691,7 +690,7 @@ public class KafkaConfigBackingStoreTest { new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); - LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>(); deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index bafbce8..f90b77f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -49,7 +49,6 @@ public class TopicAdminTest { @Test public void returnNullWithApiVersionMismatch() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); - boolean internal = false; Cluster cluster = createCluster(1); try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { env.kafkaClient().setNode(cluster.controller()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java index 88afafc..b190189 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -34,42 +35,44 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class CastTest { + private final Cast<SourceRecord> xformKey = new Cast.Key<>(); + private final Cast<SourceRecord> xformValue = new Cast.Value<>(); + + @After + public void teardown() { + xformKey.close(); + xformValue.close(); + } @Test(expected = ConfigException.class) public void testConfigEmpty() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "")); + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "")); } @Test(expected = ConfigException.class) public void testConfigInvalidSchemaType() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype")); + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype")); } @Test(expected = ConfigException.class) public void testConfigInvalidTargetType() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array")); + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array")); } @Test(expected = ConfigException.class) public void testConfigInvalidMap() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra")); + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra")); } @Test(expected = ConfigException.class) public void testConfigMixWholeAndFieldTransformation() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")); + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")); } @Test public void castWholeRecordKeyWithSchema() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); + SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus")); assertEquals(Schema.Type.INT8, transformed.keySchema().type()); @@ -78,9 +81,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaInt8() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.INT8, transformed.valueSchema().type()); @@ -89,9 +91,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaInt16() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.INT16, transformed.valueSchema().type()); @@ -100,9 +101,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaInt32() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.INT32, transformed.valueSchema().type()); @@ -111,9 +111,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaInt64() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.INT64, transformed.valueSchema().type()); @@ -122,9 +121,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaFloat32() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.FLOAT32, transformed.valueSchema().type()); @@ -133,9 +131,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaFloat64() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.FLOAT64, transformed.valueSchema().type()); @@ -144,9 +141,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaBooleanTrue() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type()); @@ -155,9 +151,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaBooleanFalse() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 0)); assertEquals(Schema.Type.BOOLEAN, transformed.valueSchema().type()); @@ -166,9 +161,8 @@ public class CastTest { @Test public void castWholeRecordValueWithSchemaString() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); assertEquals(Schema.Type.STRING, transformed.valueSchema().type()); @@ -178,9 +172,8 @@ public class CastTest { @Test public void castWholeRecordDefaultValue() { // Validate default value in schema is correctly converted - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, SchemaBuilder.float32().defaultValue(-42.125f).build(), 42.125f)); assertEquals(Schema.Type.INT32, transformed.valueSchema().type()); @@ -190,9 +183,8 @@ public class CastTest { @Test public void castWholeRecordKeySchemaless() { - final Cast<SourceRecord> xform = new Cast.Key<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); + SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, 42, Schema.STRING_SCHEMA, "bogus")); assertNull(transformed.keySchema()); @@ -201,9 +193,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessInt8() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -212,9 +203,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessInt16() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -223,9 +213,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessInt32() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -234,9 +223,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessInt64() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -245,9 +233,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessFloat32() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -256,9 +243,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessFloat64() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -267,9 +253,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessBooleanTrue() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -278,9 +263,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessBooleanFalse() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 0)); assertNull(transformed.valueSchema()); @@ -289,9 +273,8 @@ public class CastTest { @Test public void castWholeRecordValueSchemalessString() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string")); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); assertNull(transformed.valueSchema()); @@ -300,16 +283,14 @@ public class CastTest { @Test(expected = DataException.class) public void castWholeRecordValueSchemalessUnsupportedType() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); - xform.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo"))); + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8")); + xformValue.apply(new SourceRecord(null, null, "topic", 0, null, Collections.singletonList("foo"))); } @Test public void castFieldsWithSchema() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32")); + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32")); // Include an optional fields and fields with defaults to validate their values are passed through properly SchemaBuilder builder = SchemaBuilder.struct(); @@ -336,7 +317,7 @@ public class CastTest { recordValue.put("string", "42"); // optional field intentionally omitted - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, supportedTypesSchema, recordValue)); assertEquals((short) 8, ((Struct) transformed.value()).get("int8")); @@ -356,8 +337,7 @@ public class CastTest { @SuppressWarnings("unchecked") @Test public void castFieldsSchemaless() { - final Cast<SourceRecord> xform = new Cast.Value<>(); - xform.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32")); + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32")); Map<String, Object> recordValue = new HashMap<>(); recordValue.put("int8", (byte) 8); recordValue.put("int16", (short) 16); @@ -367,7 +347,7 @@ public class CastTest { recordValue.put("float64", -64.); recordValue.put("boolean", true); recordValue.put("string", "42"); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, recordValue)); assertNull(transformed.valueSchema()); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java index b54a908..0b7ce96 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -28,10 +29,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class ExtractFieldTest { + private final ExtractField<SinkRecord> xform = new ExtractField.Key<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void schemaless() { - final ExtractField<SinkRecord> xform = new ExtractField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0); @@ -43,7 +49,6 @@ public class ExtractFieldTest { @Test public void withSchema() { - final ExtractField<SinkRecord> xform = new ExtractField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index 86851f3..d709054 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; import java.util.Arrays; @@ -36,25 +37,30 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class FlattenTest { + private final Flatten<SourceRecord> xformKey = new Flatten.Key<>(); + private final Flatten<SourceRecord> xformValue = new Flatten.Value<>(); + + @After + public void teardown() { + xformKey.close(); + xformValue.close(); + } @Test(expected = DataException.class) public void topLevelStructRequired() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); - xform.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); + xformValue.configure(Collections.<String, String>emptyMap()); + xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT32_SCHEMA, 42)); } @Test(expected = DataException.class) public void topLevelMapRequired() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); - xform.apply(new SourceRecord(null, null, "topic", 0, null, 42)); + xformValue.configure(Collections.<String, String>emptyMap()); + xformValue.apply(new SourceRecord(null, null, "topic", 0, null, 42)); } @Test public void testNestedStruct() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); SchemaBuilder builder = SchemaBuilder.struct(); builder.field("int8", Schema.INT8_SCHEMA); @@ -93,7 +99,7 @@ public class FlattenTest { Struct twoLevelNestedStruct = new Struct(twoLevelNestedSchema); twoLevelNestedStruct.put("A", oneLevelNestedStruct); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, twoLevelNestedSchema, twoLevelNestedStruct)); @@ -113,8 +119,7 @@ public class FlattenTest { @Test public void testNestedMapWithDelimiter() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.singletonMap("delimiter", "#")); + xformValue.configure(Collections.singletonMap("delimiter", "#")); Map<String, Object> supportedTypes = new HashMap<>(); supportedTypes.put("int8", (byte) 8); @@ -130,7 +135,7 @@ public class FlattenTest { Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes); Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", (Object) oneLevelNestedMap); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, twoLevelNestedMap)); @@ -151,8 +156,7 @@ public class FlattenTest { @Test public void testOptionalFieldStruct() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); SchemaBuilder builder = SchemaBuilder.struct(); builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA); @@ -168,7 +172,7 @@ public class FlattenTest { Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema); oneLevelNestedStruct.put("B", supportedTypes); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, oneLevelNestedSchema, oneLevelNestedStruct)); @@ -179,15 +183,14 @@ public class FlattenTest { @Test public void testOptionalFieldMap() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); Map<String, Object> supportedTypes = new HashMap<>(); supportedTypes.put("opt_int32", null); Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", (Object) supportedTypes); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, oneLevelNestedMap)); @@ -200,12 +203,11 @@ public class FlattenTest { @Test public void testKey() { - final Flatten<SourceRecord> xform = new Flatten.Key<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformKey.configure(Collections.<String, String>emptyMap()); Map<String, Map<String, Integer>> key = Collections.singletonMap("A", Collections.singletonMap("B", 12)); SourceRecord src = new SourceRecord(null, null, "topic", null, key, null, null); - SourceRecord transformed = xform.apply(src); + SourceRecord transformed = xformKey.apply(src); assertNull(transformed.keySchema()); assertTrue(transformed.key() instanceof Map); @@ -215,10 +217,9 @@ public class FlattenTest { @Test(expected = DataException.class) public void testUnsupportedTypeInMap() { - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); Object value = Collections.singletonMap("foo", Arrays.asList("bar", "baz")); - xform.apply(new SourceRecord(null, null, "topic", 0, null, value)); + xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value)); } @Test @@ -227,8 +228,7 @@ public class FlattenTest { // children should also be optional. Similarly, if the parent Struct has a default value, the default value for // the flattened field - final Flatten<SourceRecord> xform = new Flatten.Value<>(); - xform.configure(Collections.<String, String>emptyMap()); + xformValue.configure(Collections.<String, String>emptyMap()); SchemaBuilder builder = SchemaBuilder.struct().optional(); builder.field("req_field", Schema.STRING_SCHEMA); @@ -240,7 +240,7 @@ public class FlattenTest { // Intentionally leave this entire value empty since it is optional Struct value = new Struct(schema); - SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, schema, value)); + SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, schema, value)); assertNotNull(transformed); Schema transformedSchema = transformed.valueSchema(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java index 299aab3..1135b85 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -27,10 +28,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; public class HoistFieldTest { + private final HoistField<SinkRecord> xform = new HoistField.Key<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void schemaless() { - final HoistField<SinkRecord> xform = new HoistField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0); @@ -42,7 +48,6 @@ public class HoistFieldTest { @Test public void withSchema() { - final HoistField<SinkRecord> xform = new HoistField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java index 4ce6ad4..a0a0975 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertFieldTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -32,14 +33,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; public class InsertFieldTest { + private InsertField<SourceRecord> xform = new InsertField.Value<>(); + + @After + public void teardown() { + xform.close(); + } @Test(expected = DataException.class) public void topLevelStructRequired() { - final InsertField<SourceRecord> xform = new InsertField.Value<>(); xform.configure(Collections.singletonMap("topic.field", "topic_field")); - xform.apply(new SourceRecord(null, null, - "", 0, - Schema.INT32_SCHEMA, 42)); + xform.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42)); } @Test @@ -51,7 +55,6 @@ public class InsertFieldTest { props.put("static.field", "instance_id"); props.put("static.value", "my-instance-id"); - final InsertField<SourceRecord> xform = new InsertField.Value<>(); xform.configure(props); final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build(); @@ -94,7 +97,6 @@ public class InsertFieldTest { props.put("static.field", "instance_id"); props.put("static.value", "my-instance-id"); - final InsertField<SourceRecord> xform = new InsertField.Value<>(); xform.configure(props); final SourceRecord record = new SourceRecord(null, null, "test", 0, http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java index cc001f1..aa47001 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java @@ -32,8 +32,9 @@ public class RegexRouterTest { props.put("replacement", replacement); final RegexRouter<SinkRecord> router = new RegexRouter<>(); router.configure(props); - return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)) - .topic(); + String sinkTopic = router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)).topic(); + router.close(); + return sinkTopic; } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java index e3d9d3a..6a1a13a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Test; import java.util.HashMap; @@ -28,11 +29,15 @@ import java.util.Map; import static org.junit.Assert.assertEquals; public class ReplaceFieldTest { + private ReplaceField<SinkRecord> xform = new ReplaceField.Value<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void schemaless() { - final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>(); - final Map<String, String> props = new HashMap<>(); props.put("blacklist", "dont"); props.put("renames", "abc:xyz,foo:bar"); @@ -57,8 +62,6 @@ public class ReplaceFieldTest { @Test public void withSchema() { - final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>(); - final Map<String, String> props = new HashMap<>(); props.put("whitelist", "abc,foo"); props.put("renames", "abc:xyz,foo:bar"); http://git-wip-us.apache.org/repos/asf/kafka/blob/f87d58b7/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java ---------------------------------------------------------------------- diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java index 206c51e..257b382 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.After; import org.junit.Test; import java.util.Collections; @@ -31,10 +32,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; public class SetSchemaMetadataTest { + private final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); + + @After + public void teardown() { + xform.close(); + } @Test public void schemaNameUpdate() { - final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); xform.configure(Collections.singletonMap("schema.name", "foo")); final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); final SinkRecord updatedRecord = xform.apply(record); @@ -43,7 +49,6 @@ public class SetSchemaMetadataTest { @Test public void schemaVersionUpdate() { - final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); xform.configure(Collections.singletonMap("schema.version", 42)); final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); final SinkRecord updatedRecord = xform.apply(record); @@ -56,7 +61,6 @@ public class SetSchemaMetadataTest { props.put("schema.name", "foo"); props.put("schema.version", "42"); - final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); xform.configure(props); final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); @@ -83,7 +87,6 @@ public class SetSchemaMetadataTest { final Map<String, String> props = new HashMap<>(); props.put("schema.name", "foo"); props.put("schema.version", "42"); - final SetSchemaMetadata<SinkRecord> xform = new SetSchemaMetadata.Value<>(); xform.configure(props); final SinkRecord record = new SinkRecord("", 0, null, null, schema, value, 0);