[
https://issues.apache.org/jira/browse/SAMZA-226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14061085#comment-14061085
]
Chris Riccomini commented on SAMZA-226:
---------------------------------------
bq. If all sound good, will start working on it. Thank you.
Yep, this sounds correct.
One tricky part is going to be figuring out where to call
SystemAdmin.createChangelogStream. The most logical place seems to be in
TaskStorageManager.
One thing to be careful of here is that there's one TaskStorageManager per
container, which means the createChangelogStream might be executed multiple
times on job start. We have this same problem with the KafkaCheckpointManager,
which tries to create its checkpoint topic in the same way. You'll have to
follow the same pattern as the KafkaCheckpointManager, which is to
pessimistically try and create the change log in every task storage manager,
and catch the "topic already exists" exception if the creation fails. You
CANNOT fetch the topic's metadata (TopicMetadataRequest) before trying to
create the changelog, because Kafka will AUTOMATICALLY create the topic if it
doesn't exist when the topic metadata request is sent.
Have a look at the KafkaCheckpointManager's createTopic code, and also
SAMZA-289 for the edge case that I'm talking about.
> Auto-create changelog streams for kv
> ------------------------------------
>
> Key: SAMZA-226
> URL: https://issues.apache.org/jira/browse/SAMZA-226
> Project: Samza
> Issue Type: Bug
> Components: container, kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
>
> Currently, changelog topics are not auto-created. This is a frustrating user
> experience, and there are a few useful defaults that should be set that are
> not obvious when creating Kafka topics with log compaction enabled.
> We should have Samza auto-create changelog streams for the kv stores that
> have changelogs enabled.
> In Kafka's case, the changelog topics should be created with compaction
> enabled. They should also be created with a smaller (100mb) default
> [segment.bytes|http://kafka.apache.org/documentation.html#configuration]
> setting. The smaller segment.bytes setting is useful for low-volume
> changelogs. The problem we've seen in the past is that the default
> log.segment.bytes is 1 gig. Kafka's compaction implementation NEVER touches
> the most recent log segment. This means that, if you have a very small state
> store, but execute a lot of deletes/updates (e.g. you've only got maybe 25
> megs of active state, but are deleting and updating it frequently), you will
> always end up with at LEAST 1 gig of state to restore (since the most recent
> segment will always contain non-compacted writes). This is silly since your
> active (compacted) state is really only ~25 megs. Shrinking the segment bytes
> means that you'll have a smaller maximum data size to restore. The trade off
> here is that we'll have more segment files for changelogs, which will
> increase file handles.
> The trick is doing this in a generic way, since we are supporting changelogs
> for more than just Kafka systems. I think the interface to do the stream
> creation belongs in the SystemAdmin interface. It would be nice to have a
> generic SystemAdmin.createStream() interface, but this would require giving
> it kafka-specific configuration. Another option is to have
> SystemAdmin.createChangelogStream, but this seems a bit hacky at first
> glance. We need to think this part through.
> [~martinkl], in hello-samza, how are we creating log compacted state stores
> with the appropriate number of partitions? Is this handled as part of
> bin/grid?
--
This message was sent by Atlassian JIRA
(v6.2#6252)