Thanks for the responses. Ewen is correct that I am referring to the
*new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).

I am extending the consumer to allow my applications more control over
committed offsets.  I really want to get away from zookeeper (so using
the offset storage), and re-balancing is something I haven't really
needed to tackle in an automated/seamless way.  Either way, I'll hold
off going further down this road until there is more interest.

@Gwen
I set up a single consumer without partition.assignment.strategy or
rebalance.callback.class.  I was unable to subscribe to just a topic
("Unknown api code 11" on broker), but I could subscribe to a
topicpartition.  This makes sense as I would need to handle re-balance
outside the consumer.  Things functioned as expected (well  I have an
additional minor fix to code from KAFKA-2121), and the only exceptions
on broker were due to closing consumers (which I have become
accustomed to).  My tests are specific to my extended version of the
consumer, but they basically do a little writing and reading with
different serde classes with application controlled commits (similar
to onSuccess and onFailure after each record, but with tolerance for
out of order acknowledgements).

If you are interested, here is the patch of the hack against trunk.

On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
<e...@confluent.io> wrote:
> @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the 0.8.2/0.8.3
> that's being discussed here?
>
> I think the original question was about using the *new* consumer ("clients
> consumer") with 0.8.2. Gwen's right, it will use features not even
> implemented in the broker in trunk yet, let alone the 0.8.2.
>
> I don't think the "enable.commit.downgrade" type option, or supporting the
> old protocol with the new consumer at all, makes much sense. You'd end up
> with some weird hybrid of simple and high-level consumers -- you could use
> offset storage, but you'd have to manage rebalancing yourself since none of
> the coordinator support would be there.
>
>
> On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede <n...@confluent.io> wrote:
>
>> My understanding is that ideally the 0.8.3 consumer should work with an
>> 0.8.2 broker if the offset commit config was set to "zookeeper".
>>
>> The only thing that might not work is offset commit to Kafka, which makes
>> sense since the 0.8.2 broker does not support Kafka based offset
>> management.
>>
>> If we broke all kinds of offset commits, then it seems like a regression,
>> no?
>>
>> On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira <gshap...@cloudera.com>
>> wrote:
>>
>> > I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
>> > broker... there are some essential pieces that are missing in 0.8.2
>> > (Coordinator, Heartbeat, etc).
>> > Maybe I'm missing something. It will be nice if this will work :)
>> >
>> > Mind sharing what / how you tested? Were there no errors in broker
>> > logs after your fix?
>> >
>> > On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon <lydon.s...@gmail.com>
>> wrote:
>> > > Currently the clients consumer (trunk) sends offset commit requests of
>> > > version 2.  The 0.8.2 brokers fail to handle this particular request
>> > > with a:
>> > >
>> > > java.lang.AssertionError: assertion failed: Version 2 is invalid for
>> > > OffsetCommitRequest. Valid versions are 0 or 1.
>> > >
>> > > I was able to make this work via a forceful downgrade of this
>> > > particular request, but I would like some feedback on whether a
>> > > "enable.commit.downgrade" configuration would be a tolerable method to
>> > > allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
>> > > interested in this even being a goal worth pursuing.
>> > >
>> > > Thanks,
>> > > Sean
>> >
>>
>>
>>
>> --
>> Thanks,
>> Neha
>>
>
>
>
> --
> Thanks,
> Ewen
From 31a14a1749cb164bdde0f59951e4d3aae8ce80a1 Mon Sep 17 00:00:00 2001
From: Sean Lydon <sly...@apixio.com>
Date: Fri, 24 Apr 2015 09:29:41 -0700
Subject: [PATCH] Hardcoded changes to downgrade offset_commit to version 1.

---
 .../java/org/apache/kafka/clients/KafkaClient.java | 10 ++++-
 .../org/apache/kafka/clients/NetworkClient.java    | 12 ++++++
 .../kafka/clients/consumer/KafkaConsumer.java      |  2 +-
 .../clients/consumer/internals/Coordinator.java    | 46 ++++++++++++++++++++--
 .../java/org/apache/kafka/clients/MockClient.java  |  5 +++
 5 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 1311f85..e608ca8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -126,9 +126,17 @@ public interface KafkaClient extends Closeable {
      */
     public RequestHeader nextRequestHeader(ApiKeys key);
 
+    /*
+     * Generate a request header for the next request
+     *
+     * @param key The API key of the request
+     * @param version The API key's version of the request
+     */
+    public RequestHeader nextRequestHeader(ApiKeys key, short version);
+
     /**
      * Wake up the client if it is currently blocked waiting for I/O
      */
     public void wakeup();
 
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b7ae595..28949a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -304,6 +304,18 @@ public class NetworkClient implements KafkaClient {
     }
 
     /**
+     * Generate a request header for the given API key and version
+     *
+     * @param key The api key
+     * @param version The api key's version
+     * @return A request header with the appropriate client id and correlation id
+     */
+    @Override
+    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
+        return new RequestHeader(key.id, version, clientId, correlation++);
+    }
+
+    /**
      * Interrupt the client if it is blocked waiting on I/O.
      */
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 09ecb42..d301be4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -495,7 +495,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
-                this.keyDeserializer.configure(config.originals(), false);
+                this.keyDeserializer.configure(config.originals(), true);
             } else {
                 this.keyDeserializer = keyDeserializer;
             }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e55ab11..f44be48 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -162,11 +162,10 @@ public final class Coordinator {
             Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
             offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
             for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
             OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                 this.generation,
                 this.consumerId,
-                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
             // send request and possibly wait for response if it is blocking
@@ -175,7 +174,7 @@ public final class Coordinator {
             if (blocking) {
                 boolean done;
                 do {
-                    ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+                    ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT, (short) 1, req.toStruct(), handler, now);
 
                     // check for errors
                     done = true;
@@ -191,7 +190,7 @@ public final class Coordinator {
                     }
                 } while (!done);
             } else {
-                this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now));
+                this.client.send(initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, (short) 1, req.toStruct(), handler, now));
             }
         }
     }
@@ -293,6 +292,29 @@ public final class Coordinator {
     }
 
     /**
+     * Repeatedly attempt to send a request to the coordinator until a response is received (retry if we are
+     * disconnected). Note that this means any requests sent this way must be idempotent.
+     *
+     * @return The response
+     */
+    private ClientResponse blockingCoordinatorRequest(ApiKeys api,
+                                                      short version,
+                                                      Struct request,
+                                                      RequestCompletionHandler handler,
+                                                      long now) {
+        while (true) {
+            ClientRequest coordinatorRequest = initiateCoordinatorRequest(api, version, request, handler, now);
+            ClientResponse coordinatorResponse = sendAndReceive(coordinatorRequest, now);
+            if (coordinatorResponse.wasDisconnected()) {
+                handleCoordinatorDisconnect(coordinatorResponse);
+                Utils.sleep(this.retryBackoffMs);
+            } else {
+                return coordinatorResponse;
+            }
+        }
+    }
+
+    /**
      * Ensure the consumer coordinator is known and we have a ready connection to it.
      */
     private void ensureCoordinatorReady() {
@@ -440,6 +462,22 @@ public final class Coordinator {
     }
 
     /**
+     * Initiate a request to the coordinator, but with specific api version
+     */
+    private ClientRequest initiateCoordinatorRequest(ApiKeys api, short version, Struct request, RequestCompletionHandler handler, long now) {
+
+        // first make sure the coordinator is known and ready
+        ensureCoordinatorReady();
+
+        // create the request for the coordinator
+        log.debug("Issuing request ({},{}: {}) to coordinator {}", api, version, request, this.consumerCoordinator.id());
+
+        RequestHeader header = this.client.nextRequestHeader(api, version);
+        RequestSend send = new RequestSend(this.consumerCoordinator.id(), header, request);
+        return new ClientRequest(now, true, send, handler);
+    }
+
+    /**
      * Attempt to send a request and receive its response.
      *
      * @return The response
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 5e3fab1..098071a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -168,6 +168,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
+        return new RequestHeader(key.id, version, "mock", correlation++);
+    }
+
+    @Override
     public void wakeup() {
     }
 
-- 
1.9.1

Reply via email to