Repository: kafka
Updated Branches:
  refs/heads/trunk c4d629a0b -> 607c3c21f


KAFKA-5755; KafkaProducer should be refactored to use LogContext

With LogContext, each producer log item is automatically prefixed with client 
id and transactional id.

Author: huxihx <huxi...@hotmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3703 from huxihx/KAFKA-5755


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/607c3c21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/607c3c21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/607c3c21

Branch: refs/heads/trunk
Commit: 607c3c21f652a8f911d1efb8374d2eec313a4e2d
Parents: c4d629a
Author: huxihx <huxi...@hotmail.com>
Authored: Fri Aug 25 10:38:15 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Aug 25 10:42:40 2017 -0700

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +
 .../kafka/clients/producer/KafkaProducer.java   | 38 +++++----
 .../producer/internals/RecordAccumulator.java   | 10 ++-
 .../clients/producer/internals/Sender.java      |  8 +-
 .../internals/RecordAccumulatorTest.java        | 86 ++++++++++++--------
 .../clients/producer/internals/SenderTest.java  | 20 +++--
 .../internals/TransactionManagerTest.java       |  6 +-
 7 files changed, 103 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7f3c4b6..e4c244f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -35,6 +35,8 @@
               files="ConfigDef.java"/>
     <suppress checks="ParameterNumber"
               files="DefaultRecordBatch.java"/>
+    <suppress checks="ParameterNumber"
+              files="Sender.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|KafkaAdminClient).java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ce2efba..ba0d848 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -58,9 +58,9 @@ import 
org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -224,7 +224,7 @@ import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
  */
 public class KafkaProducer<K, V> implements Producer<K, V> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(KafkaProducer.class);
+    private final Logger log;
     private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.producer";
     public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
@@ -305,13 +305,19 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
     @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, 
Serializer<V> valueSerializer) {
         try {
-            log.trace("Starting the Kafka producer");
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = Time.SYSTEM;
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + 
PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+            String transactionalId = 
userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
+                    (String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
+            LogContext logContext = new LogContext(String.format("[Producer 
clientId=%s, transactionalId=%s", clientId, transactionalId));
+            log = logContext.logger(KafkaProducer.class);
+            log.trace("Starting the Kafka producer");
+
             Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
@@ -354,13 +360,14 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
             this.maxBlockTimeMs = 
config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
             this.requestTimeoutMs = 
config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            this.transactionManager = configureTransactionState(config);
-            int retries = configureRetries(config, transactionManager != null);
-            int maxInflightRequests = configureInflightRequests(config, 
transactionManager != null);
-            short acks = configureAcks(config, transactionManager != null);
+            this.transactionManager = configureTransactionState(config, log);
+            int retries = configureRetries(config, transactionManager != null, 
log);
+            int maxInflightRequests = configureInflightRequests(config, 
transactionManager != null, log);
+            short acks = configureAcks(config, transactionManager != null, 
log);
 
             this.apiVersions = new ApiVersions();
-            this.accumulator = new 
RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
+            this.accumulator = new RecordAccumulator(logContext,
+                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                     this.totalMemorySize,
                     this.compressionType,
                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
@@ -388,7 +395,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     true,
                     apiVersions,
                     throttleTimeSensor);
-            this.sender = new Sender(client,
+            this.sender = new Sender(logContext,
+                    client,
                     this.metadata,
                     this.accumulator,
                     maxInflightRequests == 1,
@@ -407,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.errors = this.metrics.sensor("errors");
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
-            log.debug("Kafka producer with client id {} created", clientId);
+            log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed 
this is to prevent resource leak. see KAFKA-2121
             close(0, TimeUnit.MILLISECONDS, true);
@@ -416,7 +424,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private static TransactionManager configureTransactionState(ProducerConfig 
config) {
+    private static TransactionManager configureTransactionState(ProducerConfig 
config, Logger log) {
 
         TransactionManager transactionManager = null;
 
@@ -450,7 +458,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return transactionManager;
     }
 
-    private static int configureRetries(ProducerConfig config, boolean 
idempotenceEnabled) {
+    private static int configureRetries(ProducerConfig config, boolean 
idempotenceEnabled, Logger log) {
         boolean userConfiguredRetries = false;
         if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
             userConfiguredRetries = true;
@@ -468,7 +476,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return config.getInt(ProducerConfig.RETRIES_CONFIG);
     }
 
-    private static int configureInflightRequests(ProducerConfig config, 
boolean idempotenceEnabled) {
+    private static int configureInflightRequests(ProducerConfig config, 
boolean idempotenceEnabled, Logger log) {
         boolean userConfiguredInflights = false;
         if 
(config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
 {
             userConfiguredInflights = true;
@@ -484,7 +492,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         return 
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
     }
 
-    private static short configureAcks(ProducerConfig config, boolean 
idempotenceEnabled) {
+    private static short configureAcks(ProducerConfig config, boolean 
idempotenceEnabled, Logger log) {
         boolean userConfiguredAcks = false;
         short acks = (short) 
parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
         if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
@@ -1055,7 +1063,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
         ClientUtils.closeQuietly(partitioner, "producer partitioner", 
firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-        log.debug("Kafka producer with client id {} has been closed", 
clientId);
+        log.debug("Kafka producer has been closed");
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", 
firstException.get());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 9b9aa02..38b5e51 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -39,10 +39,10 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -67,8 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public final class RecordAccumulator {
 
-    private static final Logger log = 
LoggerFactory.getLogger(RecordAccumulator.class);
-
+    private final Logger log;
     private volatile boolean closed;
     private final AtomicInteger flushesInProgress;
     private final AtomicInteger appendsInProgress;
@@ -89,6 +88,7 @@ public final class RecordAccumulator {
     /**
      * Create a new record accumulator
      *
+     * @param logContext The log context used for logging
      * @param batchSize The size to use when allocating {@link MemoryRecords} 
instances
      * @param totalSize The maximum memory the record accumulator can use.
      * @param compression The compression codec for the records
@@ -103,7 +103,8 @@ public final class RecordAccumulator {
      * @param transactionManager The shared transaction state object which 
tracks producer IDs, epochs, and sequence
      *                           numbers per partition.
      */
-    public RecordAccumulator(int batchSize,
+    public RecordAccumulator(LogContext logContext,
+                             int batchSize,
                              long totalSize,
                              CompressionType compression,
                              long lingerMs,
@@ -112,6 +113,7 @@ public final class RecordAccumulator {
                              Time time,
                              ApiVersions apiVersions,
                              TransactionManager transactionManager) {
+        this.log = logContext.logger(RecordAccumulator.class);
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 411282b..9c3b4d2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -53,10 +53,10 @@ import 
org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -73,7 +73,7 @@ import static 
org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
  */
 public class Sender implements Runnable {
 
-    private static final Logger log = LoggerFactory.getLogger(Sender.class);
+    private final Logger log;
 
     /* the state of each nodes connection */
     private final KafkaClient client;
@@ -120,7 +120,8 @@ public class Sender implements Runnable {
     /* all the state related to transactions, in particular the producer id, 
producer epoch, and sequence numbers */
     private final TransactionManager transactionManager;
 
-    public Sender(KafkaClient client,
+    public Sender(LogContext logContext,
+                  KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
                   boolean guaranteeMessageOrder,
@@ -133,6 +134,7 @@ public class Sender implements Runnable {
                   long retryBackoffMs,
                   TransactionManager transactionManager,
                   ApiVersions apiVersions) {
+        this.log = logContext.logger(Sender.class);
         this.client = client;
         this.accumulator = accumulator;
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 99e7557..d486c10 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestUtils;
@@ -88,6 +89,7 @@ public class RecordAccumulatorTest {
             Collections.<String>emptySet(), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 1000;
+    private final LogContext logContext = new LogContext();
 
     @After
     public void teardown() {
@@ -101,8 +103,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize,
-                CompressionType.NONE, 10L, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * 
batchSize, CompressionType.NONE, 10L);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
@@ -150,8 +152,8 @@ public class RecordAccumulatorTest {
     private void testAppendLarge(CompressionType compressionType) throws 
Exception {
         int batchSize = 512;
         byte[] value = new byte[2 * batchSize];
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                compressionType, 0L, 100L, metrics, time, new ApiVersions(), 
null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
1024, compressionType, 0L);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
 
@@ -189,8 +191,8 @@ public class RecordAccumulatorTest {
         apiVersions.update(node1.idString(), 
NodeApiVersions.create(Collections.singleton(
                 new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 
0, (short) 2))));
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                compressionType, 0L, 100L, metrics, time, apiVersions, null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
1024, compressionType, 0L);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", 
Collections.singleton(node1), accum.ready(cluster, 
time.milliseconds()).readyNodes);
 
@@ -213,8 +215,8 @@ public class RecordAccumulatorTest {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, lingerMs);
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, 
time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -232,8 +234,8 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 10L, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 10L);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -252,8 +254,8 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 0L, 100L, metrics, time, new 
ApiVersions(), null);
+        final RecordAccumulator accum = createTestRecordAccumulator(
+                1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 0L);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -297,8 +299,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
         // Just short of going over the limit so we trigger linger time
         int appends = expectedNumAppends(batchSize);
 
@@ -332,7 +334,7 @@ public class RecordAccumulatorTest {
     public void testRetryBackoff() throws Exception {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
-        final RecordAccumulator accum = new RecordAccumulator(1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
+        final RecordAccumulator accum = new RecordAccumulator(logContext, 1024 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
                 CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, 
new ApiVersions(), null);
 
         long now = time.milliseconds();
@@ -370,8 +372,9 @@ public class RecordAccumulatorTest {
     @Test
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
-        final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new 
ApiVersions(), null);
+        final RecordAccumulator accum = createTestRecordAccumulator(
+                4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 
1024, CompressionType.NONE, lingerMs);
+
         for (int i = 0; i < 100; i++) {
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs);
             assertTrue(accum.hasIncomplete());
@@ -409,8 +412,8 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testAwaitFlushComplete() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(4 * 1024 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 
1024, CompressionType.NONE, Long.MAX_VALUE);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -430,8 +433,8 @@ public class RecordAccumulatorTest {
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new 
AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(128 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new 
ApiVersions(), null);
+        final RecordAccumulator accum = createTestRecordAccumulator(
+                128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, 
CompressionType.NONE, lingerMs);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception 
exception) {
@@ -469,8 +472,8 @@ public class RecordAccumulatorTest {
         int numRecords = 100;
 
         final AtomicInteger numExceptionReceivedInCallback = new 
AtomicInteger(0);
-        final RecordAccumulator accum = new RecordAccumulator(128 + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new 
ApiVersions(), null);
+        final RecordAccumulator accum = createTestRecordAccumulator(
+                128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, 
CompressionType.NONE, lingerMs);
         final KafkaException cause = new KafkaException();
 
         class TestCallback implements Callback {
@@ -515,8 +518,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, 
new ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, lingerMs);
         int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
@@ -585,8 +588,8 @@ public class RecordAccumulatorTest {
         // test case assumes that the records do not fill the batch completely
         int batchSize = 1025;
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, 10, 100L, metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(
+                batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 
batchSize, CompressionType.NONE, 10);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs);
@@ -622,7 +625,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
         apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new 
ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
                 (short) 0, (short) 2))));
-        RecordAccumulator accum = new RecordAccumulator(batchSize + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+        RecordAccumulator accum = new RecordAccumulator(logContext, batchSize 
+ DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
                 CompressionType.NONE, 10, 100L, metrics, time, apiVersions, 
new TransactionManager());
         accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0);
     }
@@ -630,8 +633,8 @@ public class RecordAccumulatorTest {
     @Test
     public void testSplitAndReenqueue() throws ExecutionException, 
InterruptedException {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 
CompressionType.GZIP, 10, 100L, metrics, time,
-                                                        new ApiVersions(), 
null);
+        RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, 
CompressionType.GZIP, 10);
+
         // Create a big batch
         ByteBuffer buffer = ByteBuffer.allocate(4096);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
@@ -689,8 +692,7 @@ public class RecordAccumulatorTest {
 
         // First set the compression ratio estimation to be good.
         CompressionRatioEstimator.setEstimation(tp1.topic(), 
CompressionType.GZIP, 0.1f);
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 
bufferCapacity, CompressionType.GZIP, 0L, 100L,
-                                                        metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize, 
bufferCapacity, CompressionType.GZIP, 0L);
         int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
         assertTrue("There should be some split batches", numSplitBatches > 0);
         // Drain all the split batches.
@@ -715,8 +717,7 @@ public class RecordAccumulatorTest {
         final int batchSize = 1024;
         final int numMessages = 1000;
 
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 3 * 1024, 
CompressionType.GZIP, 10, 100L,
-                                                        metrics, time, new 
ApiVersions(), null);
+        RecordAccumulator accum = createTestRecordAccumulator(batchSize, 3 * 
1024, CompressionType.GZIP, 10);
         // Adjust the high and low compression ratio message percentage
         for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; 
goodCompRatioPercentage++) {
             int numSplit = 0;
@@ -835,4 +836,21 @@ public class RecordAccumulatorTest {
             size += recordSize;
         }
     }
+
+    /**
+     * Return a test RecordAccumulator instance
+     */
+    private RecordAccumulator createTestRecordAccumulator(int batchSize, long 
totalSize, CompressionType type, long lingerMs) {
+        return new RecordAccumulator(
+                logContext,
+                batchSize,
+                totalSize,
+                type,
+                lingerMs,
+                100L,
+                metrics,
+                time,
+                new ApiVersions(),
+                null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b6a09ae..e66cce0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -55,6 +55,7 @@ import 
org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.DelayedReceive;
 import org.apache.kafka.test.MockSelector;
@@ -103,6 +104,7 @@ public class SenderTest {
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;
     private Sender sender = null;
+    private final LogContext loggerFactory = new LogContext();
 
     @Before
     public void setup() {
@@ -275,7 +277,7 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(client, metadata, this.accumulator, 
false, MAX_REQUEST_SIZE, ACKS_ALL,
+            Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, m, time, REQUEST_TIMEOUT, 50, null, 
apiVersions);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, 
"key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -322,7 +324,7 @@ public class SenderTest {
         int maxRetries = 1;
         Metrics m = new Metrics();
         try {
-            Sender sender = new Sender(client, metadata, this.accumulator, 
true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+            Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, time, REQUEST_TIMEOUT, 50, null, apiVersions);
             // Create a two broker cluster, with partition 0 on broker 0 and 
partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
@@ -574,7 +576,7 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client, metadata, this.accumulator, true, 
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
@@ -615,7 +617,7 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client, metadata, this.accumulator, true, 
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
@@ -652,7 +654,7 @@ public class SenderTest {
 
         int maxRetries = 10;
         Metrics m = new Metrics();
-        Sender sender = new Sender(client, metadata, this.accumulator, true, 
MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+        Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                 m, time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, 
MAX_BLOCK_TIMEOUT).future;
@@ -702,9 +704,9 @@ public class SenderTest {
         // Set a good compression ratio.
         CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 
0.2f);
         try (Metrics m = new Metrics()) {
-            accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 
CompressionType.GZIP, 0L, 0L, m, time,
+            accumulator = new RecordAccumulator(loggerFactory, batchSize, 1024 
* 1024, CompressionType.GZIP, 0L, 0L, m, time,
                     new ApiVersions(), txnManager);
-            Sender sender = new Sender(client, metadata, this.accumulator, 
true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
+            Sender sender = new Sender(loggerFactory, client, metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
                     m, time, REQUEST_TIMEOUT, 1000L, txnManager, new 
ApiVersions());
             // Create a two broker cluster, with partition 0 on broker 0 and 
partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, topic, 2);
@@ -822,9 +824,9 @@ public class SenderTest {
         metricTags.put("client-id", CLIENT_ID);
         MetricConfig metricConfig = new MetricConfig().tags(metricTags);
         this.metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 
CompressionType.NONE, 0L, 0L, metrics, time,
+        this.accumulator = new RecordAccumulator(loggerFactory, batchSize, 
1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time,
                 apiVersions, transactionManager);
-        this.sender = new Sender(this.client, this.metadata, this.accumulator, 
true, MAX_REQUEST_SIZE, ACKS_ALL,
+        this.sender = new Sender(loggerFactory, this.client, this.metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, this.metrics, this.time, REQUEST_TIMEOUT, 50, 
transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), 
time.milliseconds());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4fbcd96..282d91b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -110,6 +111,7 @@ public class TransactionManagerTest {
     private Sender sender = null;
     private TransactionManager transactionManager = null;
     private Node brokerNode = null;
+    private final LogContext logContext = new LogContext();
 
     @Before
     public void setup() {
@@ -120,8 +122,8 @@ public class TransactionManagerTest {
         this.brokerNode = new Node(0, "localhost", 2211);
         this.transactionManager = new TransactionManager(transactionalId, 
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS);
         Metrics metrics = new Metrics(metricConfig, time);
-        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 
CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
-        this.sender = new Sender(this.client, this.metadata, this.accumulator, 
true, MAX_REQUEST_SIZE, ACKS_ALL,
+        this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 
1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, 
transactionManager);
+        this.sender = new Sender(logContext, this.client, this.metadata, 
this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
                 MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, 
transactionManager, apiVersions);
         this.metadata.update(this.cluster, Collections.<String>emptySet(), 
time.milliseconds());
         client.setNode(brokerNode);

Reply via email to