[jira] [Created] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment
Henry Cai created KAFKA-15620: - Summary: Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment Key: KAFKA-15620 URL: https://issues.apache.org/jira/browse/KAFKA-15620 Project: Kafka Issue Type: Bug Components: log Affects Versions: 3.6.0 Reporter: Henry Cai Fix For: 3.6.1 Use the newly released 3.6.0, turn on tiered storage feature: {*}remote.log.storage.system.enable{*}=true Set up topic tier5 to be remote storage enabled. Adding some data to the topic and the data is replicated to remote storage. After a few days when the log segment is removed from remote storage when log retention kicks in, noticed the following warnings in the server log: [2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: [RemoteLogSegmentMetadataUpdate\{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1697005926358, brokerId=1043}] (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) [2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log segment. (org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore) org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No remote log segment metadata found for :RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA} at org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161) at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89) at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33) at org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157) at org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133) at java.base/java.lang.Thread.run(Thread.java:829) After some debugging, realized the problem is there are 2 sets of DELETE_SEGMENT_STARTED/FINISHED pairs in the remote metadata topic for this segment. And traced the log to where the log retention started and saw there are two delete log segment happened at that time: [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach based on the largest record timestamp in the segment (kafka.log.remote.RemoteLogManager$RLMTask) [2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach based on the largest record timestamp in the segment (kafka.log.remote.RemoteLogManager$RLMTask) And dumped out the content of the original COPY_SEGMENT_STARTED for this segment (which triggers the generation of the later DELETE_SEGMENT metadata): [2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: [RemoteLogSegmentMetadata\{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, id=YFNCaWjPQFSKCngQ1QcKpA}, startOffset=6387830, endOffset=9578660, brokerId=1043, maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] (org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache) You can see there are 2 leader epochs in this segment: segmentLeaderEpochs=\{5=6387830, 6=6721329} >From the remote log retention code >([https://github.com/apache/kafka/blob/3.6.0/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L987)] It's bucketing segments into epochs first and then looping through epochs. I am not sure whether it should generate one or two DELETE_SEGMENT for this COPY_SEGMENT_START segment. If it needs to generate 2 DELETE_SEGMENT metadata, the consumer task needs to handle that duplicate metadata situation (not throwing exceptions in the log). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-12295) Shallow Mirroring
Henry Cai created KAFKA-12295: - Summary: Shallow Mirroring Key: KAFKA-12295 URL: https://issues.apache.org/jira/browse/KAFKA-12295 Project: Kafka Issue Type: Improvement Components: consumer, core, mirrormaker, producer Reporter: Henry Cai Assignee: Henry Cai Fix For: 2.8.0 KIP-712: https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8089) High level consumer from MirrorMaker is slow to deal with SSL certification expiration
Henry Cai created KAFKA-8089: Summary: High level consumer from MirrorMaker is slow to deal with SSL certification expiration Key: KAFKA-8089 URL: https://issues.apache.org/jira/browse/KAFKA-8089 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 2.0.0 Reporter: Henry Cai We have been using Kafka 2.0's mirror maker (which used High level consumer) to do replication. The topic is SSL enabled and the certificate will expire at a random time within 12 hours. When the certificate expired we will see many SSL related exception in the log [2019-03-07 18:02:54,128] ERROR [Consumer clientId=kafkamirror-euw1-use1-m10nkafka03-1, groupId=kafkamirror-euw1-use1-m10nkafka03] Connection to node 3005 failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient) This error will repeat for several hours. However even with the SSL error, the preexisting socket connection will still work so the main fetching activities is actually not affected, but the metadata operations from the client and the heartbeats from heartbeat thread will be affected since they might open new socket connections. I think those errors are most likely originated from those side activities. The situation will last several hours until the main fetcher thread tried to open a new connection (usually due to consumer rebalance) and then the SSL Authentication exception will abort the operation and mirror maker will exit. During that several hours, the client wouldn't be able to get the latest metadata and heartbeats also falters (we see rebalancing triggered because of this). In NetworkClient.processDisconnection(), when the above method prints the ERROR message, can it just throw the AuthenticationException up, this will kill the KafkaConsumer.poll(), and this will speedup the certificate recycle (in our case, we will restart the mirror maker with the new certificate) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai resolved KAFKA-3904. -- Resolution: Duplicate Dup of KAFKA-3619 > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: architecture, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358038#comment-15358038 ] Henry Cai commented on KAFKA-3904: -- This is a dup of KAFKA-3619 > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: architecture, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352387#comment-15352387 ] Henry Cai commented on KAFKA-3904: -- Please take a look at this PR for the fix: https://github.com/apache/kafka/pull/1563 > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: architecture, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352126#comment-15352126 ] Henry Cai commented on KAFKA-3904: -- I took a look at FileChannel.open, looks like it will still create a file descriptor for that channel, so the underlying problem of creating too many file descriptors are still there. I am not hundred percent sure we can use this new FileChannel.open() since it relies on underlying FileSystemProvider.newFileChannel() and some of the implementations throws NotSupportedOperationException. I think I will still use the traditional RandomAccessFile.getChannel and post a PR for this. > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: architecture, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350018#comment-15350018 ] Henry Cai commented on KAFKA-3904: -- I have the fix also: -FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel( +FileChannel channel = null; +synchronized (channels) { +channel = channels.get(lockFile); +if (channel == null) { +channel = new RandomAccessFile(lockFile, "rw").getChannel(); +channels.put(lockFile, channel); +log.info("Creating new channel: {} for file: {}", channel, loc +} +} > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3904: - Description: I noticed when my application was running long (> 1 day), I will get 'Too many open files' error. I used 'lsof' to list all the file descriptors used by the process, it's over 32K, but most of them belongs to the .lock file, e.g. this same lock file shows 2700 times. I looked at the code, I think the problem is in: File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); Each time new RandomAccessFile is called, a new fd will be created, we probably should either close or reuse this RandomAccessFile object. lsof result: java14799 hcai *740u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *743u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *746u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *755u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc 2709 24381 319662 was: Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
Henry Cai created KAFKA-3904: Summary: File descriptor leaking (Too many open files) for long running stream process Key: KAFKA-3904 URL: https://issues.apache.org/jira/browse/KAFKA-3904 Project: Kafka Issue Type: Bug Components: streams Reporter: Henry Cai Assignee: Henry Cai Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart
[ https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3890 started by Henry Cai. > Kafka Streams: task assignment is not maintained on cluster restart or > rolling restart > -- > > Key: KAFKA-3890 > URL: https://issues.apache.org/jira/browse/KAFKA-3890 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > Currently the task assignment in TaskAssignor is not deterministic. During > cluster restart or rolling restart, even though the participating worker > nodes are the same, but the TaskAssignor is not able to maintain a > deterministic mapping, so about 20% partitions will be reassigned which would > cause state repopulation on cluster restart time. > When the participating worker nodes are not changed, we really just want to > keep the old task assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart
[ https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343209#comment-15343209 ] Henry Cai commented on KAFKA-3890: -- PR: https://github.com/apache/kafka/pull/1538 > Kafka Streams: task assignment is not maintained on cluster restart or > rolling restart > -- > > Key: KAFKA-3890 > URL: https://issues.apache.org/jira/browse/KAFKA-3890 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > Currently the task assignment in TaskAssignor is not deterministic. During > cluster restart or rolling restart, even though the participating worker > nodes are the same, but the TaskAssignor is not able to maintain a > deterministic mapping, so about 20% partitions will be reassigned which would > cause state repopulation on cluster restart time. > When the participating worker nodes are not changed, we really just want to > keep the old task assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart
[ https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3890: - Description: Currently the task assignment in TaskAssignor is not deterministic. During cluster restart or rolling restart, even though the participating worker nodes are the same, but the TaskAssignor is not able to maintain a deterministic mapping, so about 20% partitions will be reassigned which would cause state repopulation on cluster restart time. When the participating worker nodes are not changed, we really just want to keep the old task assignment. was: Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". > Kafka Streams: task assignment is not maintained on cluster restart or > rolling restart > -- > > Key: KAFKA-3890 > URL: https://issues.apache.org/jira/browse/KAFKA-3890 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > Currently the task assignment in TaskAssignor is not deterministic. During > cluster restart or rolling restart, even though the participating worker > nodes are the same, but the TaskAssignor is not able to maintain a > deterministic mapping, so about 20% partitions will be reassigned which would > cause state repopulation on cluster restart time. > When the participating worker nodes are not changed, we really just want to > keep the old task assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart
Henry Cai created KAFKA-3890: Summary: Kafka Streams: task assignment is not maintained on cluster restart or rolling restart Key: KAFKA-3890 URL: https://issues.apache.org/jira/browse/KAFKA-3890 Project: Kafka Issue Type: Bug Components: streams Reporter: Henry Cai Assignee: Henry Cai Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343115#comment-15343115 ] Henry Cai commented on KAFKA-3740: -- Yes, you or Roger can pick it up. > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Henry Cai > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3059) ConsumerGroupCommand should allow resetting offsets for consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-3059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332649#comment-15332649 ] Henry Cai commented on KAFKA-3059: -- We would like to have the capability to move the offset to a certain time in the past. Doesn't have to be a precise point, move to the offset a little before that past time is fine, I think the ListOffsetRequest gives that capability. > ConsumerGroupCommand should allow resetting offsets for consumer groups > --- > > Key: KAFKA-3059 > URL: https://issues.apache.org/jira/browse/KAFKA-3059 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Jason Gustafson > > As discussed here: > http://mail-archives.apache.org/mod_mbox/kafka-users/201601.mbox/%3CCA%2BndhHpf3ib%3Ddsh9zvtfVjRiUjSz%2B%3D8umXm4myW%2BpBsbTYATAQ%40mail.gmail.com%3E > * Given a consumer group, remove all stored offsets > * Given a group and a topic, remove offset for group and topic > * Given a group, topic, partition and offset - set the offset for the > specified partition and group with the given value -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303428#comment-15303428 ] Henry Cai commented on KAFKA-3740: -- Looks like you want two sets of RocksDB settings, one for K/V store, one for range scan store. I think for most of the small RocksDB (size < 5GB), those settings (target file size, or bloom filter) probably won't matter. To make it really flexible, you would need to provide per store rocksdb settings. E.g. for A join B, if A has small-size records and B has big-size records you would want to set the targeFileSize smaller for RocksDB A and targetFileSize bigger for RocksDB B. It seems it would have the following overlay config structures in StreamConfig: rocksdb.default.target_file_size_mb=16 rocksdb.default.use_bloom_filter=true ... rocksdb.aggregation_style.target_file_size_mb=8 ... rocksdb.join_style.target_file_size_mb=32 ... rocksdb.custom1_style.target_file_size_mb=128 System understands 'default', 'aggregation_style', 'join_style' and will choose different style accordingly based on the context. Application can optionally associate a RocksDB with customer style e.g. custom1_style (need some API enhancement). This might get more complicated, I think for the initial version, as long as we can provide rocksdb.default.property this would be a big enhancement. > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Henry Cai > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294310#comment-15294310 ] Henry Cai commented on KAFKA-3740: -- Worked now. > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Henry Cai > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai reassigned KAFKA-3740: Assignee: Henry Cai > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Henry Cai > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294270#comment-15294270 ] Henry Cai commented on KAFKA-3740: -- Don't see that 'assign' button > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294204#comment-15294204 ] Henry Cai commented on KAFKA-3740: -- You can assign this one to me. > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3185) Allow users to cleanup internal data
[ https://issues.apache.org/jira/browse/KAFKA-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15267619#comment-15267619 ] Henry Cai commented on KAFKA-3185: -- Second to this. It's very painful for us to test our demo program. We have to either change our application id or manually remove rocksDB or kafka log files. > Allow users to cleanup internal data > > > Key: KAFKA-3185 > URL: https://issues.apache.org/jira/browse/KAFKA-3185 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Priority: Blocker > Labels: user-experience > Fix For: 0.10.1.0 > > > Currently the internal data is managed completely by Kafka Streams framework > and users cannot clean them up actively. This results in a bad out-of-the-box > user experience especially for running demo programs since it results > internal data (changelog topics, RocksDB files, etc) that need to be cleaned > manually. It will be better to add a > {code} > KafkaStreams.cleanup() > {code} > function call to clean up these internal data programmatically. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs
[ https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250977#comment-15250977 ] Henry Cai commented on KAFKA-3101: -- We really need this batching/buffering feature for us to adopt kafka streams, otherwise the output rate from aggregation store is too high. Any idea on when this will be implemented? Another use case is similar to this, we have a left outer join case between two streams: INSERT INTO C SELECT a, b FROM A Left Outer Join B on a.id = b.id On the output stream, we might see (a, null) then followed by (a, b) which cancels the (a, null). In order to reduce this kind of churn, we can have a policy of 15 minute buffer, if we don't see (a, b) within 15 minute then we emit (a, null). Hopefully your solution for buffering aggregation output can solve the left outer join case as well. The other workaround would be to delay the B stream by 15 minutes which also needs a buffering mechanism. > Optimize Aggregation Outputs > > > Key: KAFKA-3101 > URL: https://issues.apache.org/jira/browse/KAFKA-3101 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Bill Bejeck > Labels: architecture > Fix For: 0.10.1.0 > > > Today we emit one output record for each incoming message for Table / > Windowed Stream Aggregations. For example, say we have a sequence of > aggregate outputs computed from the input stream (assuming there is no agg > value for this key before): > V1, V2, V3, V4, V5 > Then the aggregator will output the following sequence of ChangeoldValue>: > , , , , > where could cost a lot of CPU overhead computing the intermediate results. > Instead if we can let the underlying state store to "remember" the last > emitted old value, we can reduce the number of emits based on some configs. > More specifically, we can add one more field in the KV store engine storing > the last emitted old value, which only get updated when we emit to the > downstream processor. For example: > At Beginning: > Store: key => empty (no agg values yet) > V1 computed: > Update Both in Store: key => (V1, V1), Emit > V2 computed: > Update NewValue in Store: key => (V2, V1), No Emit > V3 computed: > Update NewValue in Store: key => (V3, V1), No Emit > V4 computed: > Update Both in Store: key => (V4, V4), Emit > V5 computed: > Update NewValue in Store: key => (V5, V4), No Emit > One more thing to consider is that, we need a "closing" time control on the > not-yet-emitted keys; when some time has elapsed (or the window is to be > closed), we need to check for any key if their current materialized pairs > have not been emitted (for example in the above example). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store
[ https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3595: - Description: Currently state store replication always go through a compact kafka topic. For some state stores, e.g. JoinWindow, there are no duplicates in the store, there is not much benefit using a compacted topic. The problem of using compacted topic is the records can stay in kafka broker forever. In my use case, my key is ad_id, it's incrementing all the time, not bounded, I am worried the disk space on broker for that topic will go forever. I think we either need the capability to purge the compacted records on broker, or allow us to specify different compact option for state store replication. was: Currently in Kafka Streams, the way the windows are expired in RocksDB is triggered by new event insertion. When a window is created at T0 with 10 minutes retention, when we saw a new record coming with event timestamp T0 + 10 +1, we will expire that window (remove it) out of RocksDB. In the real world, it's very easy to see event coming with future timestamp (or out-of-order events coming with big time gaps between events), this way of retiring a window based on one event's event timestamp is dangerous. I think at least we need to consider both the event's event time and server/stream time elapse. > Add capability to specify replication compact option for stream store > - > > Key: KAFKA-3595 > URL: https://issues.apache.org/jira/browse/KAFKA-3595 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Currently state store replication always go through a compact kafka topic. > For some state stores, e.g. JoinWindow, there are no duplicates in the store, > there is not much benefit using a compacted topic. > The problem of using compacted topic is the records can stay in kafka broker > forever. In my use case, my key is ad_id, it's incrementing all the time, not > bounded, I am worried the disk space on broker for that topic will go forever. > I think we either need the capability to purge the compacted records on > broker, or allow us to specify different compact option for state store > replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time
[ https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3596: - Description: Currently in Kafka Streams, the way the windows are expired in RocksDB is triggered by new event insertion. When a window is created at T0 with 10 minutes retention, when we saw a new record coming with event timestamp T0 + 10 +1, we will expire that window (remove it) out of RocksDB. In the real world, it's very easy to see event coming with future timestamp (or out-of-order events coming with big time gaps between events), this way of retiring a window based on one event's event timestamp is dangerous. I think at least we need to consider both the event's event time and server/stream time elapse. was: Currently state store replication always go through a compact kafka topic. For some state stores, e.g. JoinWindow, there are no duplicates in the store, there is not much benefit using a compacted topic. The problem of using compacted topic is the records can stay in kafka broker forever. In my use case, my key is ad_id, it's incrementing all the time, not bounded, I am worried the disk space on broker for that topic will go forever. I think we either need the capability to purge the compacted records on broker, or allow us to specify different compact option for state store replication. > Kafka Streams: Window expiration needs to consider more than event time > --- > > Key: KAFKA-3596 > URL: https://issues.apache.org/jira/browse/KAFKA-3596 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Currently in Kafka Streams, the way the windows are expired in RocksDB is > triggered by new event insertion. When a window is created at T0 with 10 > minutes retention, when we saw a new record coming with event timestamp T0 + > 10 +1, we will expire that window (remove it) out of RocksDB. > In the real world, it's very easy to see event coming with future timestamp > (or out-of-order events coming with big time gaps between events), this way > of retiring a window based on one event's event timestamp is dangerous. I > think at least we need to consider both the event's event time and > server/stream time elapse. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store
[ https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3595: - Description: Currently in Kafka Streams, the way the windows are expired in RocksDB is triggered by new event insertion. When a window is created at T0 with 10 minutes retention, when we saw a new record coming with event timestamp T0 + 10 +1, we will expire that window (remove it) out of RocksDB. In the real world, it's very easy to see event coming with future timestamp (or out-of-order events coming with big time gaps between events), this way of retiring a window based on one event's event timestamp is dangerous. I think at least we need to consider both the event's event time and server/stream time elapse. was: Currently state store replication always go through a compact kafka topic. For some state stores, e.g. JoinWindow, there are no duplicates in the store, there is not much benefit using a compacted topic. The problem of using compacted topic is the records can stay in kafka broker forever. In my use case, my key is ad_id, it's incrementing all the time, not bounded, I am worried the disk space on broker for that topic will go forever. I think we either need the capability to purge the compacted records on broker, or allow us to specify different compact option for state store replication. > Add capability to specify replication compact option for stream store > - > > Key: KAFKA-3595 > URL: https://issues.apache.org/jira/browse/KAFKA-3595 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Currently in Kafka Streams, the way the windows are expired in RocksDB is > triggered by new event insertion. When a window is created at T0 with 10 > minutes retention, when we saw a new record coming with event timestamp T0 + > 10 +1, we will expire that window (remove it) out of RocksDB. > In the real world, it's very easy to see event coming with future timestamp > (or out-of-order events coming with big time gaps between events), this way > of retiring a window based on one event's event timestamp is dangerous. I > think at least we need to consider both the event's event time and > server/stream time elapse. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time
Henry Cai created KAFKA-3596: Summary: Kafka Streams: Window expiration needs to consider more than event time Key: KAFKA-3596 URL: https://issues.apache.org/jira/browse/KAFKA-3596 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.1.0 Reporter: Henry Cai Assignee: Guozhang Wang Priority: Minor Currently state store replication always go through a compact kafka topic. For some state stores, e.g. JoinWindow, there are no duplicates in the store, there is not much benefit using a compacted topic. The problem of using compacted topic is the records can stay in kafka broker forever. In my use case, my key is ad_id, it's incrementing all the time, not bounded, I am worried the disk space on broker for that topic will go forever. I think we either need the capability to purge the compacted records on broker, or allow us to specify different compact option for state store replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store
[ https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3595: - Description: Currently state store replication always go through a compact kafka topic. For some state stores, e.g. JoinWindow, there are no duplicates in the store, there is not much benefit using a compacted topic. The problem of using compacted topic is the records can stay in kafka broker forever. In my use case, my key is ad_id, it's incrementing all the time, not bounded, I am worried the disk space on broker for that topic will go forever. I think we either need the capability to purge the compacted records on broker, or allow us to specify different compact option for state store replication. was:Add the ability to record metrics in the serializer/deserializer components. As it stands, I cannot record latency/sensor metrics since the API does not provide the context at the serde levels. Exposing the ProcessorContext at this level may not be the solution; but perhaps change the configure method to take a different config or init context and make the StreamMetrics available in that context along with config information. > Add capability to specify replication compact option for stream store > - > > Key: KAFKA-3595 > URL: https://issues.apache.org/jira/browse/KAFKA-3595 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Currently state store replication always go through a compact kafka topic. > For some state stores, e.g. JoinWindow, there are no duplicates in the store, > there is not much benefit using a compacted topic. > The problem of using compacted topic is the records can stay in kafka broker > forever. In my use case, my key is ad_id, it's incrementing all the time, > not bounded, I am worried the disk space on broker for that topic will go > forever. > I think we either need the capability to purge the compacted records on > broker, or allow us to specify different compact option for state store > replication. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store
[ https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3595: - Affects Version/s: (was: 0.9.0.1) 0.10.1.0 > Add capability to specify replication compact option for stream store > - > > Key: KAFKA-3595 > URL: https://issues.apache.org/jira/browse/KAFKA-3595 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Add the ability to record metrics in the serializer/deserializer components. > As it stands, I cannot record latency/sensor metrics since the API does not > provide the context at the serde levels. Exposing the ProcessorContext at > this level may not be the solution; but perhaps change the configure method > to take a different config or init context and make the StreamMetrics > available in that context along with config information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store
[ https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3595: - Issue Type: Improvement (was: New Feature) > Add capability to specify replication compact option for stream store > - > > Key: KAFKA-3595 > URL: https://issues.apache.org/jira/browse/KAFKA-3595 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Henry Cai >Assignee: Guozhang Wang >Priority: Minor > > Add the ability to record metrics in the serializer/deserializer components. > As it stands, I cannot record latency/sensor metrics since the API does not > provide the context at the serde levels. Exposing the ProcessorContext at > this level may not be the solution; but perhaps change the configure method > to take a different config or init context and make the StreamMetrics > available in that context along with config information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3595) Add capability to specify replication compact option for stream store
Henry Cai created KAFKA-3595: Summary: Add capability to specify replication compact option for stream store Key: KAFKA-3595 URL: https://issues.apache.org/jira/browse/KAFKA-3595 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.9.0.1 Reporter: Henry Cai Assignee: Guozhang Wang Priority: Minor Add the ability to record metrics in the serializer/deserializer components. As it stands, I cannot record latency/sensor metrics since the API does not provide the context at the serde levels. Exposing the ProcessorContext at this level may not be the solution; but perhaps change the configure method to take a different config or init context and make the StreamMetrics available in that context along with config information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)