-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18022/#review34677
-----------------------------------------------------------


My views (which hardly matter !!) about the points in 'Questions/comments for 
discussion' section:

- Change in partition assignment would need sync of offsets across brokers and 
subsequent bootstrap. Would be better to address in a separate jira.
- +1 for single error code for all offsets. There must be versioning for 
OffsetCommitResponse as in future there might be need to have separate error 
codes.
- It would be good to have separate error codes so that clients are aware of 
what went wrong.
- Lazy offset loading:
  - "clear out the offset cache on becoming a follower" : The cleanup thread 
would eventually clean this stuff after the retention period. As per 
on-paper-math, having extra offsets won't eat up much space in the offsets 
cache. There must be a mechanism (Map[partition -> lastKnownLeaderEpoch] ??) to 
figure out that those offsets in the cache are stale and a bootstrap is needed 
for them. As a downside, the old offsets would sit for long in the old 
generation space in heap eating up few MBs worth of space.
  - How would offset manager populate Map[partition -> lastKnownLeaderEpoch] ? 
Its clear that it can't do that all by itself thus implying some coupling with 
other module.
  - DurableLog: I liked the concept. This might touch a lot of places in the 
codebase and would be better to address in a separate jira.


core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/18022/#comment64895>

    1. This could be made a 'val' instead of 'var'
    2. Are entries ever cleared off this pool ? I see that it will keep growing 
after rebalances.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/18022/#comment64897>

    There are two different names used possibly for the same thing: offset 
coordinator and offset manager. Might be confusing to someone new top this 
feature.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/18022/#comment64902>

    I am wondering why is an infinite loop needed inside offset commit. A 
finite retry counter might help for getting few reattempts.


- Tejas Patil


On Feb. 12, 2014, 7:50 p.m., Joel Koshy wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18022/
> -----------------------------------------------------------
> 
> (Updated Feb. 12, 2014, 7:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1012
>     https://issues.apache.org/jira/browse/KAFKA-1012
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> I picked up most of Tejas' patch and made various edits for review here as I
> would like this to be completed and closed.
> 
> Here is a link to the original implementation wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
> 
> A lot of it is the same in this revision of the patch, but there is a bunch
> of refactoring. This patch does not use an "embedded producer" in the
> consumer. i.e., the consumer issues offset commit/fetch requests directly to
> the broker. Also, I decided against doing any kind of request forwarding and
> added a "ConsumerMetadataRequest" that will be used to determine the offset
> coordinator (and subsequently group coordinator that may be useful for the
> client rewrite - see
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design).
> Also, there were some questions on how to support multiple offset manager
> implementations cleanly. After thinking about it I think it makes the code
> simpler and clearer if we just have one good offset storage mechanism (i.e.,
> Kafka-based). Consumers that want to store offsets elsewhere can do so on
> their own. (However, if we do want to do this somewhat cleanly, see the
> discussion on separation of APIs below.)
> 
> Here is a quick recap of how offset management within Kafka works:
> - A special __offsets topic holds consumer offsets.
> - The consumer group serves as the partition key for offsets committed to
>   the __offsets topic. i.e., all offsets for all topics that a group
>   consumes will be in a single partition of the offsets topic.
> - The "group-topic-partition" is the actual (stored) key in each message of
>   the offsets topic.  This facilitates de-duplication (and thus removal) of
>   older offsets.
> - The offset manager also contains an in-memory cache of offsets so it can
>   serve offset fetch requests quickly.
> - Think of commits as a little more than a produce request. If and only if
>   the commit is appended to the __offsets log as a regular produce request
>   we update the offsets cache. So the semantics are identical to a produce
>   request.  Offset fetches return whatever is in the cache. If it is absent,
>   and offsets have not yet been loaded from the logs into the cache (on
>   becoming a leader), then we return an "OffsetsLoading" error code.
> 
> (Tejas' wiki has pretty good diagrams that describe the above.)
> 
> Some more details:
> 
> - Atomicity per-commit: One drawback of the Zookeeper-based offset commits
>   is that we when we commit multiple offsets (since we don't use
>   multi-write) we have to write offsets serially so it is not atomic.  In
>   this implementation I went with Jun's suggestion on using a compressed
>   message set. This ensures that we will disallow partial commits of a bulk
>   commit. I have hard-coded this to GZIP but maybe it is better to just
>   expose a config. Another option is to introduce an identity compression
>   codec.
> - The main corner cases to consider are when there is leader movement due to
>   broker failures and simultaneous offset commits/fetches. Offset fetches
>   would only occur if there are consumer-side rebalances or shutdowns. The
>   guarantees we want to provide are: (i) successfully acknowledged offset
>   commits should be returned on the next offset fetch - i.e., should not be
>   lost (ii) offset fetches should never return a stale offset.
>   - On becoming a follower of an offsets topic partition:
>     - Partition.makeFollower clears the offset cache of entries belonging to
>       this partition of __offsets.
>     - Any subsequent offset fetch request will find out that the partition
>       is no longer a leader and fail. There is one problem in the existing
>       patch which I will highlight in the RB along with a suggested fix.
>     - Likewise, any subsequent offset commit request will fail (since the
>       underlying producer request will fail). It is okay if the underlying
>       producer request succeeds and the broker becomes a follower for that
>       partition just before the offset cache is updated (since the broker
>       will not serve any OffsetFetchRequests for that partition until it
>       becomes a leader again).
>   - On becoming a leader of an offsets topic partition:
>     - Partition.makeLeader: will load the offsets from the log
>       (asynchronously). While this is in progress, the broker rejects offset
>       fetches to this partition. Offset commits may continue to arrive -
>       i.e., will be appended to the log and then written to the cache. The
>       load loop might actually overwrite it with an earlier offset from the
>       log but that is okay - since it will eventually reach the more recent
>       update in the log and load that into the cache.
> 
> Migrating from ZooKeeper-based offset storage to Kafka-based offset storage:
> - The broker config should set offsets.backup.enabled=true
> - Upgrade the brokers to the latest jar. (Consumers still commit
>   directly to ZooKeeper).
> - Start migrating the consumers over.
> - Consumers will now start sending offset commits to the broker. Since the
>   backup setting is enabled, offsets will also be committed to ZooKeeper.
>   This is necessary when migrating consumers.
> - After _all_ consumers have moved over you can turn off the backup.
> 
> I have made a number of preliminary comments as TODOs in the RB myself (i.e.,
> as a note to myself and others reviewing).
> 
> Questions/comments for discussion
> - Should we explicitly disallow changes to the number of offset topic 
> partitions?
>   This is necessary (or at least prompt with a warning) since changing the 
> number
>   of partitions would affect the partitioning strategy.
> - Should we remove per-partition error codes for offset commits and use just
>   a global error code for the entire request? I'm using compressed message
>   sets for commits.  i.e., the log append for a given commit will either
>   fail entirely or succeed entirely. The OffsetCommitResponse contains
>   per-partition error codes. So if the log append fails for any reason the
>   same error code would apply for all partitions. i.e., it is sufficient to
>   have a global error code. I think we currently have per-partition error
>   codes due to the fact that offset commit requests can include metadata for
>   each offset. The per-partition error code is set to MetadataTooLarge if
>   the metadata entry exceeds the MaxMetadataLength. However, in this case I
>   would prefer to just fail the entire request as opposed to doing partial
>   commits (as I am in the current patch). Anyone have thoughts on this?
> - Error codes: right now I'm using existing error codes (with the exception
>   of OffsetsLoading). It may be better to return more specific error codes
>   but I'm not sure if it matters - since the client-side implementation
>   needs to check for _any_ error and if any error exists (other than
>   MetadataTooLarge) just retry the offset commit/fetch until it succeeds.
>   i.e., the client should not really care about the actual error. If people
>   have any strong preference on this let me know.
> - Separation of APIs: Right now, the offset manager, replica manager are
>   intertwined which is less than ideal. It is okay if offset manager depends
>   on replica manager but not the other way around. Ideally, I would like to
>   have KafkaApis hand off offset commit/fetch requests to the offset manager
>   which then handles it. However, the inter-dependence comes about due to
>   the need to clear out the offset cache on becoming a follower and the need
>   to load offsets on becoming a leader. I think we can improve the
>   separation as follows:
>   - Don't optimistically load offsets/clear offsets on a leader/follower
>     transition. Instead, load offsets only when an offset fetch request
>     arrives for a partition that had not been loaded yet.
>   - The OffsetManager will need to maintain a Map[partition ->
>     lastKnownLeaderEpoch] to determine whether to load offsets or not.
>   - The above will eliminate the reference to OffsetManager from
>     ReplicaManager. KafkaApis still needs to reference the OffsetManager and
>     will need to create the offset commit message to append to the __offsets
>     log.
>   - We can actually avoid the need for KafkaApis to know about offset commit
>     messsages as well: in order to do that, we will need to create a
>     "DurableLog" layer on top of LogManager and move all the purgatory stuff
>     in there. The LogManager supports appends/reads from the local log, but
>     does not know anything about the replicas. Instead, we can have a
>     DurableLog layer that depends on ReplicaManager and LogManager and
>     contains the Producer/Fetch-Request purgatories. So OffsetManager will
>     need to depend on this DurableLog component. So KafkaApis can just hand
>     off ProducerRequests, FetchRequests to the DurableLog layer directly. It
>     will hand off OffsetCommit/OffsetFetch requests to the OffsetManager
>     which will then hand it off to the DurableLog layer.
>   - Is the above worth it? I'm not sure it is, especially if we are sticking
>     to only one offset management implementation.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c 
>   core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 
>   core/src/main/scala/kafka/api/RequestKeys.scala c81214f 
>   core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
>   core/src/main/scala/kafka/cluster/Partition.scala 1087a2e 
>   core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce 
>   core/src/main/scala/kafka/common/OffsetLoadInProgressException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 
>   core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 
>   core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 6dae149 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 
>   core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a 
>   core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 
>   core/src/main/scala/kafka/server/KafkaApis.scala ae2df20 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc 
>   core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 21bba48 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c 
>   core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/Utils.scala a89b046 
>   core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> eb274d1 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c 
>   sbt 944ebf8 
>   system_test/mirror_maker/README da53c14 
>   system_test/mirror_maker/bin/expected.out 0a1bbaf 
>   system_test/mirror_maker/bin/run-test.sh e5e6c08 
>   system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 
>   system_test/mirror_maker/config/mirror_producer.properties aa8be65 
>   system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 
>   system_test/mirror_maker/config/server_source_1_2.properties f9353e8 
>   system_test/mirror_maker/config/server_source_2_1.properties daa01ad 
>   system_test/mirror_maker/config/server_source_2_2.properties be6fdfc 
>   system_test/mirror_maker/config/server_target_1_1.properties d37955a 
>   system_test/mirror_maker/config/server_target_1_2.properties aa7546c 
>   system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 
>   system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b 
>   system_test/mirror_maker/config/zookeeper_source_1.properties f851796 
>   system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 
>   system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 
>   system_test/offset_management_testsuite/cluster_config.json PRE-CREATION 
>   system_test/offset_management_testsuite/config/console_consumer.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/config/server.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/config/zookeeper.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/offset_management_test.py 
> PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
>  PRE-CREATION 
>   system_test/testcase_to_run.json 8252860 
>   system_test/utils/kafka_system_test_utils.py fb4a9c0 
>   system_test/utils/testcase_env.py bee8716 
> 
> Diff: https://reviews.apache.org/r/18022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>

Reply via email to