Repository: kafka Updated Branches: refs/heads/trunk c4d629a0b -> 607c3c21f
KAFKA-5755; KafkaProducer should be refactored to use LogContext With LogContext, each producer log item is automatically prefixed with client id and transactional id. Author: huxihx <huxi...@hotmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3703 from huxihx/KAFKA-5755 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/607c3c21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/607c3c21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/607c3c21 Branch: refs/heads/trunk Commit: 607c3c21f652a8f911d1efb8374d2eec313a4e2d Parents: c4d629a Author: huxihx <huxi...@hotmail.com> Authored: Fri Aug 25 10:38:15 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Aug 25 10:42:40 2017 -0700 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 + .../kafka/clients/producer/KafkaProducer.java | 38 +++++---- .../producer/internals/RecordAccumulator.java | 10 ++- .../clients/producer/internals/Sender.java | 8 +- .../internals/RecordAccumulatorTest.java | 86 ++++++++++++-------- .../clients/producer/internals/SenderTest.java | 20 +++-- .../internals/TransactionManagerTest.java | 6 +- 7 files changed, 103 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7f3c4b6..e4c244f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -35,6 +35,8 @@ files="ConfigDef.java"/> <suppress checks="ParameterNumber" files="DefaultRecordBatch.java"/> + <suppress checks="ParameterNumber" + files="Sender.java"/> <suppress checks="ClassDataAbstractionCoupling" files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ce2efba..ba0d848 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -58,9 +58,9 @@ import org.apache.kafka.common.serialization.ExtendedSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collections; @@ -224,7 +224,7 @@ import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e */ public class KafkaProducer<K, V> implements Producer<K, V> { - private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); + private final Logger log; private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; @@ -305,13 +305,19 @@ public class KafkaProducer<K, V> implements Producer<K, V> { @SuppressWarnings("unchecked") private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { try { - log.trace("Starting the Kafka producer"); Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = Time.SYSTEM; clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + + String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? + (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; + LogContext logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s", clientId, transactionalId)); + log = logContext.logger(KafkaProducer.class); + log.trace("Starting the Kafka producer"); + Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) @@ -354,13 +360,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); - this.transactionManager = configureTransactionState(config); - int retries = configureRetries(config, transactionManager != null); - int maxInflightRequests = configureInflightRequests(config, transactionManager != null); - short acks = configureAcks(config, transactionManager != null); + this.transactionManager = configureTransactionState(config, log); + int retries = configureRetries(config, transactionManager != null, log); + int maxInflightRequests = configureInflightRequests(config, transactionManager != null, log); + short acks = configureAcks(config, transactionManager != null, log); this.apiVersions = new ApiVersions(); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), + this.accumulator = new RecordAccumulator(logContext, + config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), @@ -388,7 +395,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { true, apiVersions, throttleTimeSensor); - this.sender = new Sender(client, + this.sender = new Sender(logContext, + client, this.metadata, this.accumulator, maxInflightRequests == 1, @@ -407,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); - log.debug("Kafka producer with client id {} created", clientId); + log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true); @@ -416,7 +424,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } } - private static TransactionManager configureTransactionState(ProducerConfig config) { + private static TransactionManager configureTransactionState(ProducerConfig config, Logger log) { TransactionManager transactionManager = null; @@ -450,7 +458,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { return transactionManager; } - private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled) { + private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled, Logger log) { boolean userConfiguredRetries = false; if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) { userConfiguredRetries = true; @@ -468,7 +476,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { return config.getInt(ProducerConfig.RETRIES_CONFIG); } - private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) { + private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled, Logger log) { boolean userConfiguredInflights = false; if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) { userConfiguredInflights = true; @@ -484,7 +492,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); } - private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled) { + private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled, Logger log) { boolean userConfiguredAcks = false; short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)); if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) { @@ -1055,7 +1063,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); - log.debug("Kafka producer with client id {} has been closed", clientId); + log.debug("Kafka producer has been closed"); if (firstException.get() != null && !swallowException) throw new KafkaException("Failed to close kafka producer", firstException.get()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 9b9aa02..38b5e51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -39,10 +39,10 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -67,8 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public final class RecordAccumulator { - private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); - + private final Logger log; private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; @@ -89,6 +88,7 @@ public final class RecordAccumulator { /** * Create a new record accumulator * + * @param logContext The log context used for logging * @param batchSize The size to use when allocating {@link MemoryRecords} instances * @param totalSize The maximum memory the record accumulator can use. * @param compression The compression codec for the records @@ -103,7 +103,8 @@ public final class RecordAccumulator { * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence * numbers per partition. */ - public RecordAccumulator(int batchSize, + public RecordAccumulator(LogContext logContext, + int batchSize, long totalSize, CompressionType compression, long lingerMs, @@ -112,6 +113,7 @@ public final class RecordAccumulator { Time time, ApiVersions apiVersions, TransactionManager transactionManager) { + this.log = logContext.logger(RecordAccumulator.class); this.drainIndex = 0; this.closed = false; this.flushesInProgress = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 411282b..9c3b4d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -53,10 +53,10 @@ import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; @@ -73,7 +73,7 @@ import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; */ public class Sender implements Runnable { - private static final Logger log = LoggerFactory.getLogger(Sender.class); + private final Logger log; /* the state of each nodes connection */ private final KafkaClient client; @@ -120,7 +120,8 @@ public class Sender implements Runnable { /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */ private final TransactionManager transactionManager; - public Sender(KafkaClient client, + public Sender(LogContext logContext, + KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder, @@ -133,6 +134,7 @@ public class Sender implements Runnable { long retryBackoffMs, TransactionManager transactionManager, ApiVersions apiVersions) { + this.log = logContext.logger(Sender.class); this.client = client; this.accumulator = accumulator; this.metadata = metadata; http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 99e7557..d486c10 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestUtils; @@ -88,6 +89,7 @@ public class RecordAccumulatorTest { Collections.<String>emptySet(), Collections.<String>emptySet()); private Metrics metrics = new Metrics(time); private final long maxBlockTimeMs = 1000; + private final LogContext logContext = new LogContext(); @After public void teardown() { @@ -101,8 +103,8 @@ public class RecordAccumulatorTest { // test case assumes that the records do not fill the batch completely int batchSize = 1025; - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, - CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, CompressionType.NONE, 10L); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch @@ -150,8 +152,8 @@ public class RecordAccumulatorTest { private void testAppendLarge(CompressionType compressionType) throws Exception { int batchSize = 512; byte[] value = new byte[2 * batchSize]; - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - compressionType, 0L, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -189,8 +191,8 @@ public class RecordAccumulatorTest { apiVersions.update(node1.idString(), NodeApiVersions.create(Collections.singleton( new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2)))); - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - compressionType, 0L, 100L, metrics, time, apiVersions, null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0L); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -213,8 +215,8 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -232,8 +234,8 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 10L); int appends = 1024 / msgSize + 1; List<TopicPartition> partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -252,8 +254,8 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); + final RecordAccumulator accum = createTestRecordAccumulator( + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -297,8 +299,8 @@ public class RecordAccumulatorTest { // test case assumes that the records do not fill the batch completely int batchSize = 1025; - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); // Just short of going over the limit so we trigger linger time int appends = expectedNumAppends(batchSize); @@ -332,7 +334,7 @@ public class RecordAccumulatorTest { public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, + final RecordAccumulator accum = new RecordAccumulator(logContext, 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); long now = time.milliseconds(); @@ -370,8 +372,9 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); + final RecordAccumulator accum = createTestRecordAccumulator( + 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); + for (int i = 0; i < 100; i++) { accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); assertTrue(accum.hasIncomplete()); @@ -409,8 +412,8 @@ public class RecordAccumulatorTest { @Test public void testAwaitFlushComplete() throws Exception { - RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE); accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); accum.beginFlush(); @@ -430,8 +433,8 @@ public class RecordAccumulatorTest { int numRecords = 100; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); + final RecordAccumulator accum = createTestRecordAccumulator( + 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -469,8 +472,8 @@ public class RecordAccumulatorTest { int numRecords = 100; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); - final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); + final RecordAccumulator accum = createTestRecordAccumulator( + 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); final KafkaException cause = new KafkaException(); class TestCallback implements Callback { @@ -515,8 +518,8 @@ public class RecordAccumulatorTest { // test case assumes that the records do not fill the batch completely int batchSize = 1025; - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); int appends = expectedNumAppends(batchSize); // Test batches not in retry @@ -585,8 +588,8 @@ public class RecordAccumulatorTest { // test case assumes that the records do not fill the batch completely int batchSize = 1025; - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); @@ -622,7 +625,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2)))); - RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, + RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionManager()); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0); } @@ -630,8 +633,8 @@ public class RecordAccumulatorTest { @Test public void testSplitAndReenqueue() throws ExecutionException, InterruptedException { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time, - new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10); + // Create a big batch ByteBuffer buffer = ByteBuffer.allocate(4096); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); @@ -689,8 +692,7 @@ public class RecordAccumulatorTest { // First set the compression ratio estimation to be good. CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); - RecordAccumulator accum = new RecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L, 100L, - metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L); int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20); assertTrue("There should be some split batches", numSplitBatches > 0); // Drain all the split batches. @@ -715,8 +717,7 @@ public class RecordAccumulatorTest { final int batchSize = 1024; final int numMessages = 1000; - RecordAccumulator accum = new RecordAccumulator(batchSize, 3 * 1024, CompressionType.GZIP, 10, 100L, - metrics, time, new ApiVersions(), null); + RecordAccumulator accum = createTestRecordAccumulator(batchSize, 3 * 1024, CompressionType.GZIP, 10); // Adjust the high and low compression ratio message percentage for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; goodCompRatioPercentage++) { int numSplit = 0; @@ -835,4 +836,21 @@ public class RecordAccumulatorTest { size += recordSize; } } + + /** + * Return a test RecordAccumulator instance + */ + private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) { + return new RecordAccumulator( + logContext, + batchSize, + totalSize, + type, + lingerMs, + 100L, + metrics, + time, + new ApiVersions(), + null); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index b6a09ae..e66cce0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; @@ -103,6 +104,7 @@ public class SenderTest { private Metrics metrics = null; private RecordAccumulator accumulator = null; private Sender sender = null; + private final LogContext loggerFactory = new LogContext(); @Before public void setup() { @@ -275,7 +277,7 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions); // do a successful retry Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -322,7 +324,7 @@ public class SenderTest { int maxRetries = 1; Metrics m = new Metrics(); try { - Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, null, apiVersions); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); @@ -574,7 +576,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -615,7 +617,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -652,7 +654,7 @@ public class SenderTest { int maxRetries = 10; Metrics m = new Metrics(); - Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -702,9 +704,9 @@ public class SenderTest { // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); try (Metrics m = new Metrics()) { - accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, + accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, new ApiVersions(), txnManager); - Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, + Sender sender = new Sender(loggerFactory, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); @@ -822,9 +824,9 @@ public class SenderTest { metricTags.put("client-id", CLIENT_ID); MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.metrics = new Metrics(metricConfig, time); - this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, + this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); - this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, + this.sender = new Sender(loggerFactory, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 4fbcd96..282d91b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TxnOffsetCommitRequest; import org.apache.kafka.common.requests.TxnOffsetCommitResponse; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -110,6 +111,7 @@ public class TransactionManagerTest { private Sender sender = null; private TransactionManager transactionManager = null; private Node brokerNode = null; + private final LogContext logContext = new LogContext(); @Before public void setup() { @@ -120,8 +122,8 @@ public class TransactionManagerTest { this.brokerNode = new Node(0, "localhost", 2211); this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS); Metrics metrics = new Metrics(metricConfig, time); - this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); - this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, + this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); + this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); client.setNode(brokerNode);