http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index ab81bfe..d238093 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -56,14 +56,14 @@ public class MemoryRecordsBuilder { private final int initPos; private final long baseOffset; private final long logAppendTime; - private final long producerId; - private final short producerEpoch; - private final int baseSequence; private final boolean isTransactional; private final int partitionLeaderEpoch; private final int writeLimit; private final int initialCapacity; + private long producerId; + private short producerEpoch; + private int baseSequence; private long writtenUncompressed = 0; private int numRecords = 0; private float compressionRate = 1; @@ -193,6 +193,19 @@ public class MemoryRecordsBuilder { return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset); } + public void setProducerState(long pid, short epoch, int baseSequence) { + if (isClosed()) { + // Sequence numbers are assigned when the batch is closed while the accumulator is being drained. + // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will + // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence + // once a batch has been sent to the broker risks introducing duplicates. + throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client."); + } + this.producerId = pid; + this.producerEpoch = epoch; + this.baseSequence = baseSequence; + } + public void close() { if (builtRecords != null) return; @@ -577,4 +590,11 @@ public class MemoryRecordsBuilder { this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } } + + /** + * Return the ProducerId (PID) of the RecordBatches created by this builder. + */ + public long producerId() { + return this.producerId; + } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 90f1486..ae4a225 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -146,6 +146,11 @@ public interface RecordBatch extends Iterable<Record> { short producerEpoch(); /** + * Does the batch have a valid producer id set. + */ + boolean hasProducerId(); + + /** * Get the first sequence number of this record batch. * @return The first sequence number or -1 if there is none */ http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 3a99a8a..1638556 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -171,6 +171,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse { case DELETE_RECORDS: request = new DeleteRecordsRequest(struct, version); break; + case INIT_PRODUCER_ID: + request = new InitPidRequest(struct, version); + break; default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index a5d0dc4..314aa42 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -93,6 +93,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new DeleteTopicsResponse(struct); case DELETE_RECORDS: return new DeleteRecordsResponse(struct); + case INIT_PRODUCER_ID: + return new InitPidResponse(struct); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java new file mode 100644 index 0000000..284107f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class InitPidRequest extends AbstractRequest { + private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; + + private final String transactionalId; + + public static class Builder extends AbstractRequest.Builder<InitPidRequest> { + private final String transactionalId; + public Builder(String transactionalId) { + super(ApiKeys.INIT_PRODUCER_ID); + if (transactionalId != null && transactionalId.isEmpty()) + throw new IllegalArgumentException("Must set either a null or a non-empty transactional id."); + this.transactionalId = transactionalId; + } + + @Override + public InitPidRequest build(short version) { + return new InitPidRequest(this.transactionalId, version); + } + + @Override + public String toString() { + return "(type=InitPidRequest)"; + } + + } + + public InitPidRequest(Struct struct, short version) { + super(version); + this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); + } + + private InitPidRequest(String transactionalId, short version) { + super(version); + this.transactionalId = transactionalId; + } + + @Override + public AbstractResponse getErrorResponse(Throwable e) { + return new InitPidResponse(Errors.forException(e)); + } + + public static InitPidRequest parse(ByteBuffer buffer, short version) { + return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version); + } + + public String transactionalId() { + return transactionalId; + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version())); + struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); + return struct; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java new file mode 100644 index 0000000..ee92375 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; + +import java.nio.ByteBuffer; + +public class InitPidResponse extends AbstractResponse { + /** + * Possible Error codes: + * OK + * + */ + private static final String PRODUCER_ID_KEY_NAME = "pid"; + private static final String EPOCH_KEY_NAME = "epoch"; + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private final Errors error; + private final long producerId; + private final short epoch; + + public InitPidResponse(Errors error, long producerId, short epoch) { + this.error = error; + this.producerId = producerId; + this.epoch = epoch; + } + + public InitPidResponse(Struct struct) { + this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); + this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); + this.epoch = struct.getShort(EPOCH_KEY_NAME); + } + + public InitPidResponse(Errors errors) { + this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0); + } + + public long producerId() { + return producerId; + } + + public Errors error() { + return error; + } + + public short epoch() { + return epoch; + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); + struct.set(PRODUCER_ID_KEY_NAME, producerId); + struct.set(EPOCH_KEY_NAME, epoch); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + return struct; + } + + public static InitPidResponse parse(ByteBuffer buffer, short version) { + return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java index 50c90a8..6efe311 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java @@ -31,6 +31,16 @@ public final class ByteUtils { private ByteUtils() {} /** + * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes + * + * @param buffer The buffer to read from + * @return The integer read, as a long to avoid signedness + */ + public static long readUnsignedInt(ByteBuffer buffer) { + return buffer.getInt() & 0xffffffffL; + } + + /** * Read an unsigned integer from the given position without modifying the buffers position * * @param buffer the buffer to read from http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9cc863b..9117e16 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -17,18 +17,23 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.TransactionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.junit.After; @@ -89,7 +94,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize, - CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch @@ -108,7 +113,6 @@ public class RecordAccumulatorTest { Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1); assertEquals(2, partitionBatches.size()); Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator(); - assertFalse(partitionBatchesIterator.next().isWritable()); assertTrue(partitionBatchesIterator.next().isWritable()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -129,7 +133,7 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -138,7 +142,7 @@ public class RecordAccumulatorTest { public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -157,7 +161,7 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null); int appends = 1024 / msgSize + 1; List<TopicPartition> partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -177,7 +181,7 @@ public class RecordAccumulatorTest { final int msgs = 10000; final int numParts = 2; final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -222,7 +226,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); // Just short of going over the limit so we trigger linger time int appends = expectedNumAppends(batchSize); @@ -257,7 +261,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); long now = time.milliseconds(); accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); @@ -295,7 +299,7 @@ public class RecordAccumulatorTest { public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); for (int i = 0; i < 100; i++) accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); @@ -329,7 +333,7 @@ public class RecordAccumulatorTest { @Test public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null); accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs); accum.beginFlush(); @@ -349,7 +353,7 @@ public class RecordAccumulatorTest { long lingerMs = Long.MAX_VALUE; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, - CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -378,7 +382,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); int appends = expectedNumAppends(batchSize); // Test batches not in retry @@ -449,7 +453,7 @@ public class RecordAccumulatorTest { int messagesPerBatch = expectedNumAppends(1024); final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions()); + CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); final AtomicInteger expiryCallbackCount = new AtomicInteger(); final AtomicReference<Exception> unexpectedException = new AtomicReference<>(); Callback callback = new Callback() { @@ -490,7 +494,7 @@ public class RecordAccumulatorTest { int batchSize = 1025; RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions()); + CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null); int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); @@ -519,6 +523,18 @@ public class RecordAccumulatorTest { assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0); } + @Test(expected = UnsupportedVersionException.class) + public void testIdempotenceWithOldMagic() throws InterruptedException { + // Simulate talking to an older broker, ie. one which supports a lower magic. + ApiVersions apiVersions = new ApiVersions(); + int batchSize = 1025; + apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, + (short) 0, (short) 2)))); + RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, + CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionState(time)); + accum.append(tp1, 0L, key, value, null, 0); + } + /** * Return the offset delta. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 0dea6b6..0d19aa0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.TransactionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; @@ -32,11 +33,14 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.InitPidRequest; +import org.apache.kafka.common.requests.InitPidResponse; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -46,6 +50,7 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -81,24 +86,7 @@ public class SenderTest { @Before public void setup() { - Map<String, String> metricTags = new LinkedHashMap<>(); - metricTags.put("client-id", CLIENT_ID); - MetricConfig metricConfig = new MetricConfig().tags(metricTags); - metrics = new Metrics(metricConfig, time); - accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions); - sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - MAX_RETRIES, - metrics, - time, - REQUEST_TIMEOUT, - apiVersions); - - metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + setupWithTransactionState(null); } @After @@ -244,16 +232,19 @@ public class SenderTest { Metrics m = new Metrics(); try { Sender sender = new Sender(client, - metadata, - this.accumulator, - false, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - apiVersions); + metadata, + this.accumulator, + false, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 50, + null, + apiVersions + ); // do a successful retry Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect @@ -300,17 +291,19 @@ public class SenderTest { Metrics m = new Metrics(); try { Sender sender = new Sender(client, - metadata, - this.accumulator, - true, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - m, - time, - REQUEST_TIMEOUT, - apiVersions); - + metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 50, + null, + apiVersions + ); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); @@ -381,6 +374,164 @@ public class SenderTest { assertTrue("Request should be completed", future.isDone()); } + @Test + public void testInitPidRequest() throws Exception { + final long producerId = 343434L; + TransactionState transactionState = new TransactionState(new MockTime()); + setupWithTransactionState(transactionState); + client.setNode(new Node(1, "localhost", 33343)); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof InitPidRequest; + } + }, new InitPidResponse(Errors.NONE, producerId, (short) 0)); + sender.run(time.milliseconds()); + assertTrue(transactionState.hasPid()); + assertEquals(producerId, transactionState.pidAndEpoch().producerId); + assertEquals((short) 0, transactionState.pidAndEpoch().epoch); + } + + @Test + public void testSequenceNumberIncrement() throws InterruptedException { + final long producerId = 343434L; + TransactionState transactionState = new TransactionState(new MockTime()); + transactionState.setPidAndEpoch(producerId, (short) 0); + setupWithTransactionState(transactionState); + client.setNode(new Node(1, "localhost", 33343)); + + int maxRetries = 10; + Metrics m = new Metrics(); + Sender sender = new Sender(client, + metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 50, + transactionState, + apiVersions + ); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + if (body instanceof ProduceRequest) { + ProduceRequest request = (ProduceRequest) body; + MemoryRecords records = request.partitionRecordsOrFail().get(tp0); + Iterator<MutableRecordBatch> batchIterator = records.batches().iterator(); + assertTrue(batchIterator.hasNext()); + RecordBatch batch = batchIterator.next(); + assertFalse(batchIterator.hasNext()); + assertEquals(0, batch.baseSequence()); + assertEquals(producerId, batch.producerId()); + assertEquals(0, batch.producerEpoch()); + return true; + } + return false; + } + }, produceResponse(tp0, 0, Errors.NONE, 0)); + + sender.run(time.milliseconds()); // connect. + sender.run(time.milliseconds()); // send. + + sender.run(time.milliseconds()); // receive response + assertTrue(responseFuture.isDone()); + assertEquals((long) transactionState.sequenceNumber(tp0), 1L); + } + + @Test + public void testAbortRetryWhenPidChanges() throws InterruptedException { + final long producerId = 343434L; + TransactionState transactionState = new TransactionState(new MockTime()); + transactionState.setPidAndEpoch(producerId, (short) 0); + setupWithTransactionState(transactionState); + client.setNode(new Node(1, "localhost", 33343)); + + int maxRetries = 10; + Metrics m = new Metrics(); + Sender sender = new Sender(client, + metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 50, + transactionState, + apiVersions + ); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // connect. + sender.run(time.milliseconds()); // send. + String id = client.requests().peek().destination(); + Node node = new Node(Integer.valueOf(id), "localhost", 0); + assertEquals(1, client.inFlightRequestCount()); + assertTrue("Client ready status should be true", client.isReady(node, 0L)); + client.disconnect(id); + assertEquals(0, client.inFlightRequestCount()); + assertFalse("Client ready status should be false", client.isReady(node, 0L)); + + transactionState.setPidAndEpoch(producerId + 1, (short) 0); + sender.run(time.milliseconds()); // receive error + sender.run(time.milliseconds()); // reconnect + sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors. + assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount()); + + KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, "")); + assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0); + + assertTrue(responseFuture.isDone()); + assertEquals((long) transactionState.sequenceNumber(tp0), 0L); + } + + @Test + public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException { + final long producerId = 343434L; + TransactionState transactionState = new TransactionState(new MockTime()); + transactionState.setPidAndEpoch(producerId, (short) 0); + setupWithTransactionState(transactionState); + client.setNode(new Node(1, "localhost", 33343)); + + int maxRetries = 10; + Metrics m = new Metrics(); + Sender sender = new Sender(client, + metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 50, + transactionState, + apiVersions + ); + + Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // connect. + sender.run(time.milliseconds()); // send. + + assertEquals(1, client.inFlightRequestCount()); + + client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0)); + + sender.run(time.milliseconds()); + assertTrue(responseFuture.isDone()); + assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionState.hasPid()); + } + private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { assertTrue("Request should be completed", future.isDone()); try { @@ -397,4 +548,25 @@ public class SenderTest { return new ProduceResponse(partResp, throttleTimeMs); } + private void setupWithTransactionState(TransactionState transactionState) { + Map<String, String> metricTags = new LinkedHashMap<>(); + metricTags.put("client-id", CLIENT_ID); + MetricConfig metricConfig = new MetricConfig().tags(metricTags); + this.metrics = new Metrics(metricConfig, time); + this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionState); + this.sender = new Sender(this.client, + this.metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + MAX_RETRIES, + this.metrics, + this.time, + REQUEST_TIMEOUT, + 50, + transactionState, + apiVersions); + this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java new file mode 100644 index 0000000..a8a1716 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + + +import org.apache.kafka.clients.producer.TransactionState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TransactionStateTest { + + private TopicPartition topicPartition; + + @Before + public void setUp() { + topicPartition = new TopicPartition("topic-0", 0); + } + + @Test(expected = IllegalStateException.class) + public void testInvalidSequenceIncrement() { + TransactionState transactionState = new TransactionState(new MockTime()); + transactionState.incrementSequenceNumber(topicPartition, 3333); + } + + @Test + public void testDefaultSequenceNumber() { + TransactionState transactionState = new TransactionState(new MockTime()); + assertEquals((int) transactionState.sequenceNumber(topicPartition), 0); + transactionState.incrementSequenceNumber(topicPartition, 3); + assertEquals((int) transactionState.sequenceNumber(topicPartition), 3); + } + + + @Test + public void testProducerIdReset() { + TransactionState transactionState = new TransactionState(new MockTime()); + assertEquals((int) transactionState.sequenceNumber(topicPartition), 0); + transactionState.incrementSequenceNumber(topicPartition, 3); + assertEquals((int) transactionState.sequenceNumber(topicPartition), 3); + transactionState.resetProducerId(); + assertEquals((int) transactionState.sequenceNumber(topicPartition), 0); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 2dd5ab0..a2c761f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -355,7 +355,7 @@ public class MemoryRecordsTest { private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter { @Override - public boolean shouldRetain(Record record) { + public boolean shouldRetain(RecordBatch batch, Record record) { return record.hasKey(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 2024f90..8a7633e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -136,6 +136,9 @@ public class RequestResponseTest { checkRequest(createDeleteTopicsRequest()); checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException()); checkResponse(createDeleteTopicsResponse(), 0); + checkRequest(createInitPidRequest()); + checkErrorResponse(createInitPidRequest(), new UnknownServerException()); + checkResponse(createInitPidResponse(), 0); checkOlderFetchVersions(); checkResponse(createMetadataResponse(), 0); checkResponse(createMetadataResponse(), 1); @@ -787,6 +790,14 @@ public class RequestResponseTest { return new DeleteTopicsResponse(errors); } + private InitPidRequest createInitPidRequest() { + return new InitPidRequest.Builder(null).build(); + } + + private InitPidResponse createInitPidResponse() { + return new InitPidResponse(Errors.NONE, 3332, (short) 3); + } + private static class ByteBufferChannel implements GatheringByteChannel { private final ByteBuffer buf; private boolean closed = false; http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java index 0a082fb..ce23a33 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java @@ -65,6 +65,16 @@ public class ByteUtilsTest { } @Test + public void testReadUnsignedInt() { + ByteBuffer buffer = ByteBuffer.allocate(4); + long writeValue = 133444; + ByteUtils.writeUnsignedInt(buffer, writeValue); + buffer.flip(); + long readValue = ByteUtils.readUnsignedInt(buffer); + assertEquals(writeValue, readValue); + } + + @Test public void testWriteUnsignedIntLEToArray() { int value1 = 0x04030201; http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index a2308b2..194cfcc 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -24,9 +24,9 @@ import kafka.cluster.Broker import kafka.common.{KafkaException, TopicAndPartition} import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.clients._ import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests import org.apache.kafka.common.requests.{UpdateMetadataRequest, _} @@ -180,7 +180,6 @@ class RequestSendThread(val controllerId: Int, def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100)) val QueueItem(apiKey, requestBuilder, callback) = queue.take() - import NetworkClientBlockingOps._ var clientResponse: ClientResponse = null try { lock synchronized { @@ -196,7 +195,7 @@ class RequestSendThread(val controllerId: Int, else { val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true) - clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time) + clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) isSendSuccessful = true } } catch { @@ -233,10 +232,9 @@ class RequestSendThread(val controllerId: Int, } private def brokerReady(): Boolean = { - import NetworkClientBlockingOps._ try { - if (!networkClient.isReady(brokerNode)(time)) { - if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time)) + if (!NetworkClientUtils.isReady(networkClient, brokerNode, time.milliseconds())) { + if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString)) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 9d62924..2bc0c21 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -69,7 +69,7 @@ class GroupMetadataManager(val brokerId: Int, private val shuttingDown = new AtomicBoolean(false) /* number of partitions for the consumer metadata topic */ - private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount + private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount /* single-thread scheduler to handle offset/group metadata cache loading and unloading */ private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-") @@ -667,16 +667,11 @@ class GroupMetadataManager(val brokerId: Int, } /** - * Gets the partition count of the offsets topic from ZooKeeper. + * Gets the partition count of the group metadata topic from ZooKeeper. * If the topic does not exist, the configured partition count is returned. */ - private def getOffsetsTopicPartitionCount = { - val topic = Topic.GroupMetadataTopicName - val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) - if (topicData(topic).nonEmpty) - topicData(topic).size - else - config.offsetsTopicNumPartitions + private def getGroupMetadataTopicPartitionCount: Int = { + zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/PidMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/PidMetadata.scala b/core/src/main/scala/kafka/coordinator/PidMetadata.scala new file mode 100644 index 0000000..fa58add --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/PidMetadata.scala @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import kafka.utils.nonthreadsafe + +@nonthreadsafe +private[coordinator] class PidMetadata(val pid: Long) { + + /* current epoch number of the PID */ + var epoch: Short = 0 + + override def equals(that: Any): Boolean = that match { + case other: PidMetadata => pid == other.pid && epoch == other.epoch + case _ => false + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala new file mode 100644 index 0000000..43b05a4 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import kafka.common.KafkaException +import kafka.utils.{Json, Logging, ZkUtils} + +/** + * Pid manager is part of the transaction coordinator that provides PIDs in a unique way such that the same PID will not be + * assigned twice across multiple transaction coordinators. + * + * Pids are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who + * claims the block, where the written block_start_pid and block_end_pid are both inclusive. + */ +object ProducerIdManager extends Logging { + val CurrentVersion: Long = 1L + val PidBlockSize: Long = 1000L + + def generatePidBlockJson(pidBlock: ProducerIdBlock): String = { + Json.encode(Map("version" -> CurrentVersion, + "broker" -> pidBlock.brokerId, + "block_start" -> pidBlock.blockStartPid.toString, + "block_end" -> pidBlock.blockEndPid.toString) + ) + } + + def parsePidBlockData(jsonData: String): ProducerIdBlock = { + try { + Json.parseFull(jsonData).flatMap { m => + val pidBlockInfo = m.asInstanceOf[Map[String, Any]] + val brokerId = pidBlockInfo("broker").asInstanceOf[Int] + val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong + val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong + Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID)) + }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData")) + } catch { + case e: java.lang.NumberFormatException => + // this should never happen: the written data has exceeded long type limit + fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit") + throw e + } + } +} + +case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) { + override def toString: String = { + val pidBlockInfo = new StringBuilder + pidBlockInfo.append("(brokerId:" + brokerId) + pidBlockInfo.append(",blockStartPID:" + blockStartPid) + pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")") + pidBlockInfo.toString() + } +} + +class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging { + + this.logIdent = "[ProducerId Manager " + brokerId + "]: " + + private var currentPIDBlock: ProducerIdBlock = null + private var nextPID: Long = -1L + + // grab the first block of PIDs + this synchronized { + getNewPidBlock() + nextPID = currentPIDBlock.blockStartPid + } + + private def getNewPidBlock(): Unit = { + var zkWriteComplete = false + while (!zkWriteComplete) { + // refresh current pid block from zookeeper again + val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath) + + // generate the new pid block + currentPIDBlock = dataOpt match { + case Some(data) => + val currPIDBlock = ProducerIdManager.parsePidBlockData(data) + debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion") + + if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) { + // we have exhausted all pids (wow!), treat it as a fatal error + fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})") + throw new KafkaException("Have exhausted all pids.") + } + + ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize) + case None => + debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block") + ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1) + } + + val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock) + + // try to write the new pid block into zookeeper + val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData)) + zkWriteComplete = succeeded + + if (zkWriteComplete) + info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version") + } + } + + private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = { + try { + val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData) + val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath) + dataOpt match { + case Some(data) => + val currPIDBlock = ProducerIdManager.parsePidBlockData(data) + (currPIDBlock.equals(expectedPidBlock), zkVersion) + case None => + (false, -1) + } + } catch { + case e: Exception => + warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e) + + (false, -1) + } + } + + def nextPid(): Long = { + this synchronized { + // grab a new block of PIDs if this block has been exhausted + if (nextPID > currentPIDBlock.blockEndPid) { + getNewPidBlock() + nextPID = currentPIDBlock.blockStartPid + 1 + } else { + nextPID += 1 + } + + nextPID - 1 + } + } + + def shutdown() { + info(s"Shutdown complete: last PID assigned $nextPID") + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala new file mode 100644 index 0000000..41b4323 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.server.KafkaConfig +import kafka.utils.{Logging, ZkUtils} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.utils.Time + +/** + * Transaction coordinator handles message transactions sent by producers and communicate with brokers + * to update ongoing transaction's status. + * + * Each Kafka server instantiates a transaction coordinator which is responsible for a set of + * producers. Producers with specific transactional ids are assigned to their corresponding coordinators; + * Producers with no specific transactional id may talk to a random broker as their coordinators. + */ +object TransactionCoordinator { + + def apply(config: KafkaConfig, zkUtils: ZkUtils, time: Time): TransactionCoordinator = { + val pidManager = new ProducerIdManager(config.brokerId, zkUtils) + new TransactionCoordinator(config.brokerId, pidManager) + } +} + +class TransactionCoordinator(val brokerId: Int, + val pidManager: ProducerIdManager) extends Logging { + + this.logIdent = "[Transaction Coordinator " + brokerId + "]: " + + type InitPidCallback = InitPidResult => Unit + + /* Active flag of the coordinator */ + private val isActive = new AtomicBoolean(false) + + def handleInitPid(transactionalId: String, + responseCallback: InitPidCallback): Unit = { + if (transactionalId == null || transactionalId.isEmpty) { + // if the transactional id is not specified, then always blindly accept the request + // and return a new pid from the pid manager + val pid = pidManager.nextPid() + responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE)) + } else { + // check if it is the assigned coordinator for the transactional id + responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP)) + } + } + + /** + * Startup logic executed at the same time when the server starts up. + */ + def startup() { + info("Starting up.") + isActive.set(true) + info("Startup complete.") + } + + /** + * Shutdown logic executed at the same time when server shuts down. + * Ordering of actions should be reversed from the startup process. + */ + def shutdown() { + info("Shutting down.") + isActive.set(false) + pidManager.shutdown() + info("Shutdown complete.") + } + + private def initPidError(error: Errors): InitPidResult = { + InitPidResult(pid = RecordBatch.NO_PRODUCER_ID, epoch = RecordBatch.NO_PRODUCER_EPOCH, error) + } + +} + +case class InitPidResult(pid: Long, epoch: Short, error: Errors) http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2a81f26..95a6896 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -17,30 +17,30 @@ package kafka.log +import java.io.{File, IOException} +import java.text.NumberFormat +import java.util.concurrent.atomic._ +import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit} + +import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_10_0_IV0 -import kafka.utils._ import kafka.common._ +import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} -import java.io.{File, IOException} -import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} -import java.util.concurrent.atomic._ -import java.text.NumberFormat - +import kafka.utils._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ListOffsetRequest +import org.apache.kafka.common.utils.{Time, Utils} -import scala.collection.Seq import scala.collection.JavaConverters._ -import com.yammer.metrics.core.Gauge -import org.apache.kafka.common.utils.{Time, Utils} -import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} -import org.apache.kafka.common.TopicPartition +import scala.collection.{Seq, mutable} object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, - NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false) } /** @@ -56,6 +56,9 @@ object LogAppendInfo { * @param shallowCount The number of shallow messages * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing + * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a + * RecordBatch and keep track of metadata across Records in a RecordBatch. + * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log. */ case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, @@ -66,8 +69,9 @@ case class LogAppendInfo(var firstOffset: Long, targetCodec: CompressionCodec, shallowCount: Int, validBytes: Int, - offsetsMonotonic: Boolean) - + offsetsMonotonic: Boolean, + producerAppendInfos: Map[Long, ProducerAppendInfo], + isDuplicate: Boolean = false) /** * An append-only log for storing messages. @@ -93,7 +97,8 @@ case class LogAppendInfo(var firstOffset: Long, * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk * @param scheduler The thread pool scheduler used for background actions * @param time The time instance used for checking the clock - * + * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired + * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired */ @threadsafe class Log(@volatile var dir: File, @@ -101,7 +106,10 @@ class Log(@volatile var dir: File, @volatile var logStartOffset: Long = 0L, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, - time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { + time: Time = Time.SYSTEM, + val maxPidExpirationMs: Int = 60 * 60 * 1000, + val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000, + val pidSnapshotCreationIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -118,10 +126,16 @@ class Log(@volatile var dir: File, 0 } + val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir) + @volatile private var nextOffsetMetadata: LogOffsetMetadata = _ + /* Construct and load PID map */ + private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs) + /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + locally { val startMs = time.milliseconds @@ -131,13 +145,12 @@ class Log(@volatile var dir: File, activeSegment.size.toInt) logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset) + buildAndRecoverPidMap(logEndOffset) info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms" .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs)) } - val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir) - private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) newGauge("NumLogSegments", @@ -164,6 +177,19 @@ class Log(@volatile var dir: File, }, tags) + scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => { + lock synchronized { + pidMap.checkForExpiredPids(time.milliseconds) + } + }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS) + + scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => { + lock synchronized { + pidMap.maybeTakeSnapshot() + } + }, period = pidSnapshotCreationIntervalMs, unit = TimeUnit.MILLISECONDS) + + /** The name of this log */ def name = dir.getName() @@ -332,6 +358,47 @@ class Log(@volatile var dir: File, } /** + * Creates an instance of id map for this log and updates the mapping + * in the case it is missing some messages. Note that the id mapping + * starts from a snapshot that is taken strictly before the log end + * offset. Consequently, we need to process the tail of the log to update + * the mapping. + * + * @param lastOffset + * + * @return An instance of ProducerIdMapping + */ + private def buildAndRecoverPidMap(lastOffset: Long) { + lock synchronized { + val currentTimeMs = time.milliseconds + pidMap.truncateAndReload(lastOffset, currentTimeMs) + logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment => + val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset) + val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue) + val records = fetchDataInfo.records + records.batches.asScala.foreach { batch => + if (batch.hasProducerId) { + // TODO: Currently accessing any of the batch-level headers other than the offset + // or magic causes us to load the full entry into memory. It would be better if we + // only loaded the header + val numRecords = (batch.lastOffset - batch.baseOffset + 1).toInt + val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset, + numRecords, batch.maxTimestamp) + pidMap.load(batch.producerId, pidEntry, currentTimeMs) + } + } + } + pidMap.cleanFrom(logStartOffset) + } + } + + private[log] def activePids: Map[Long, ProducerIdEntry] = { + lock synchronized { + pidMap.activePids + } + } + + /** * Check if we have the "clean shutdown" file */ private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists() @@ -364,10 +431,11 @@ class Log(@volatile var dir: File, * @return Information about the appended messages including the first and last offset. */ def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = { - val appendInfo = analyzeAndValidateRecords(records) - // if we have any valid messages, append them to the log - if (appendInfo.shallowCount == 0) + val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets) + + // return if we have no valid messages or if this is a duplicate of the last appended entry + if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log @@ -433,7 +501,6 @@ class Log(@volatile var dir: File, maxTimestampInMessages = appendInfo.maxTimestamp, maxOffsetInMessages = appendInfo.lastOffset) - // now append to the log segment.append(firstOffset = appendInfo.firstOffset, largestOffset = appendInfo.lastOffset, @@ -441,6 +508,15 @@ class Log(@volatile var dir: File, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) + // update the PID sequence mapping + for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) { + trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}") + + if (assignOffsets) + producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp) + pidMap.update(producerAppendInfo) + } + // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) @@ -457,7 +533,7 @@ class Log(@volatile var dir: File, } } - /* + /** * Increment the log start offset if the provided offset is larger. */ def maybeIncrementLogStartOffset(offset: Long) { @@ -476,6 +552,7 @@ class Log(@volatile var dir: File, * <ol> * <li> each message matches its CRC * <li> each message size is valid + * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other. * </ol> * * Also compute the following quantities: @@ -488,7 +565,7 @@ class Log(@volatile var dir: File, * <li> Whether any compression codec is used (if many are used, then the last one is given) * </ol> */ - private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = { + private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset = -1L @@ -497,8 +574,12 @@ class Log(@volatile var dir: File, var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L + var isDuplicate = false + val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]() for (batch <- records.batches.asScala) { + if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0) + throw new InvalidRecordException("Client produce requests should not have more than one batch") // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. @@ -508,6 +589,7 @@ class Log(@volatile var dir: File, // check that offsets are monotonically increasing if (lastOffset >= batch.lastOffset) monotonic = false + // update the last offset seen lastOffset = batch.lastOffset @@ -527,19 +609,43 @@ class Log(@volatile var dir: File, maxTimestamp = batch.maxTimestamp offsetOfMaxTimestamp = lastOffset } + shallowMessageCount += 1 validBytesCount += batchSize val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec + + val pid = batch.producerId + if (pid != RecordBatch.NO_PRODUCER_ID) { + producerAppendInfos.get(pid) match { + case Some(appendInfo) => appendInfo.append(batch) + case None => + val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty) + if (isFromClient && lastEntry.isDuplicate(batch)) { + // This request is a duplicate so return the information about the existing entry. Note that for requests + // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration + // of the loop and the values below will not be updated more than once. + isDuplicate = true + firstOffset = lastEntry.firstOffset + lastOffset = lastEntry.lastOffset + maxTimestamp = lastEntry.timestamp + info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.") + } else { + val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry) + producerAppendInfos.put(pid, producerAppendInfo) + producerAppendInfo.append(batch) + } + } + } } // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec, - targetCodec, shallowMessageCount, validBytesCount, monotonic) + targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate) } /** @@ -941,6 +1047,7 @@ class Log(@volatile var dir: File, this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) this.logStartOffset = math.min(targetOffset, this.logStartOffset) } + buildAndRecoverPidMap(targetOffset) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 8ddeca9..830f906 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record} +import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, RecordBatch} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter @@ -219,8 +219,7 @@ class LogCleaner(val config: CleanerConfig, override def doWork() { cleanOrSleep() } - - + override def shutdown() = { initiateShutdown() backOffWaitLatch.countDown() @@ -402,7 +401,7 @@ private[log] class Cleaner(val id: Int, val retainDeletes = old.lastModified > deleteHorizonMs info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes." .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats) + cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, log.activePids, stats) } // trim excess index @@ -449,9 +448,10 @@ private[log] class Cleaner(val id: Int, map: OffsetMap, retainDeletes: Boolean, maxLogMessageSize: Int, + activePids: Map[Long, ProducerIdEntry], stats: CleanerStats) { val logCleanerFilter = new RecordFilter { - def shouldRetain(record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats) + def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId) } var position = 0 @@ -493,10 +493,17 @@ private[log] class Cleaner(val id: Int, map: kafka.log.OffsetMap, retainDeletes: Boolean, record: Record, - stats: CleanerStats): Boolean = { + stats: CleanerStats, + activePids: Map[Long, ProducerIdEntry], + pid: Long): Boolean = { if (record.isControlRecord) return true + // retain the entry if it is the last one produced by an active idempotent producer to ensure that + // the PID is not removed from the log before it has been expired + if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset)) + return true + val pastLatestOffset = record.offset > map.latestOffset if (pastLatestOffset) return true http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 55669c0..30bc26b 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -56,6 +56,7 @@ object Defaults { val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs val LeaderReplicationThrottledReplicas = Collections.emptyList[String]() val FollowerReplicationThrottledReplicas = Collections.emptyList[String]() + val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a555420..ec164e2 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -25,9 +25,10 @@ import kafka.utils._ import scala.collection._ import scala.collection.JavaConverters._ import kafka.common.{KafkaException, KafkaStorageException} -import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown} +import kafka.server._ import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future} +import kafka.admin.AdminUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time @@ -51,6 +52,7 @@ class LogManager(val logDirs: Array[File], val flushRecoveryOffsetCheckpointMs: Long, val flushStartOffsetCheckpointMs: Long, val retentionCheckMs: Long, + val maxPidExpirationMs: Int, scheduler: Scheduler, val brokerState: BrokerState, time: Time) extends Logging { @@ -166,7 +168,14 @@ class LogManager(val logDirs: Array[File], val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L) - val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time) + val current = new Log( + dir = logDir, + config = config, + logStartOffset = logStartOffset, + recoveryPoint = logRecoveryPoint, + maxPidExpirationMs = maxPidExpirationMs, + scheduler = scheduler, + time = time) if (logDir.getName.endsWith(Log.DeleteDirSuffix)) { this.logsToBeDeleted.add(current) } else { @@ -401,7 +410,15 @@ class LogManager(val logDirs: Array[File], val dataDir = nextLogDir() val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition) dir.mkdirs() - val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time) + + val log = new Log( + dir = dir, + config = config, + logStartOffset = 0L, + recoveryPoint = 0L, + maxPidExpirationMs = maxPidExpirationMs, + scheduler = scheduler, + time = time) logs.put(topicPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." .format(topicPartition.topic, @@ -493,7 +510,7 @@ class LogManager(val logDirs: Array[File], // count the number of logs in each parent directory (including 0 for empty directories val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap - var dirCounts = (zeros ++ logCounts).toBuffer + val dirCounts = (zeros ++ logCounts).toBuffer // choose the directory with the least logs in it val leastLoaded = dirCounts.sortBy(_._2).head @@ -556,3 +573,42 @@ class LogManager(val logDirs: Array[File], } } } + +object LogManager { + def apply(config: KafkaConfig, + zkUtils: ZkUtils, + brokerState: BrokerState, + kafkaScheduler: KafkaScheduler, + time: Time): LogManager = { + val defaultProps = KafkaServer.copyKafkaConfigToLog(config) + val defaultLogConfig = LogConfig(defaultProps) + + val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) => + topic -> LogConfig.fromProps(defaultProps, configs) + } + + // read the log configurations from zookeeper + val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, + dedupeBufferSize = config.logCleanerDedupeBufferSize, + dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, + ioBufferSize = config.logCleanerIoBufferSize, + maxMessageSize = config.messageMaxBytes, + maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, + backOffMs = config.logCleanerBackoffMs, + enableCleaner = config.logCleanerEnable) + + new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, + topicConfigs = topicConfigs, + defaultConfig = defaultLogConfig, + cleanerConfig = cleanerConfig, + ioThreads = config.numRecoveryThreadsPerDataDir, + flushCheckMs = config.logFlushSchedulerIntervalMs, + flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, + flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs, + retentionCheckMs = config.logCleanupIntervalMs, + maxPidExpirationMs = config.transactionIdExpirationMs, + scheduler = kafkaScheduler, + brokerState = brokerState, + time = time) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 94e3608..c01a5de 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -62,6 +62,7 @@ private[kafka] object LogValidator extends Logging { assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType, messageTimestampDiffMaxMs) } else { + validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs) } @@ -214,8 +215,15 @@ private[kafka] object LogValidator extends Logging { } if (!inPlaceAssignment) { + val (pid, epoch, sequence) = { + // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2, + // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records + // with older magic versions, this will always be NO_PRODUCER_ID, etc. + val first = records.batches.asScala.head + (first.producerId, first.producerEpoch, first.baseSequence) + } buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType, - CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords) + CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence) } else { // we can update the batch only and write the compressed payload as is val batch = records.batches.iterator.next() @@ -238,10 +246,12 @@ private[kafka] object LogValidator extends Logging { private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType, compressionType: CompressionType, logAppendTime: Long, - validatedRecords: Seq[Record]): ValidationAndOffsetAssignResult = { + validatedRecords: Seq[Record], + producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = { val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava) val buffer = ByteBuffer.allocate(estimatedSize) - val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, logAppendTime) + val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, + logAppendTime, producerId, epoch, baseSequence) validatedRecords.foreach { record => builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
