[jira] [Created] (KAFKA-15620) Duplicate remote log DELETE_SEGMENT metadata is generated when there are multiple leader epochs in the segment

2023-10-17 Thread Henry Cai (Jira)
Henry Cai created KAFKA-15620:
-

 Summary: Duplicate remote log DELETE_SEGMENT metadata is generated 
when there are multiple leader epochs in the segment
 Key: KAFKA-15620
 URL: https://issues.apache.org/jira/browse/KAFKA-15620
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 3.6.0
Reporter: Henry Cai
 Fix For: 3.6.1


Use the newly released 3.6.0, turn on tiered storage feature: 

{*}remote.log.storage.system.enable{*}=true

Set up topic tier5 to be remote storage enabled.  Adding some data to the topic 
and the data is replicated to remote storage.  After a few days when the log 
segment is removed from remote storage when log retention kicks in, noticed the 
following warnings in the server log:

[2023-10-16 22:19:10,971] DEBUG Updating remote log segment metadata: 
[RemoteLogSegmentMetadataUpdate\{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0,
 id=YFNCaWjPQFSKCngQ1QcKpA}, customMetadata=Optional.empty, 
state=DELETE_SEGMENT_STARTED, eventTimestampMs=1697005926358, brokerId=1043}] 
(org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)

[2023-10-16 22:19:10,971] WARN Error occurred while updating the remote log 
segment. 
(org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore)

org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No 
remote log segment metadata found for 
:RemoteLogSegmentId\{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
id=YFNCaWjPQFSKCngQ1QcKpA}

        at 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache.updateRemoteLogSegmentMetadata(RemoteLogMetadataCache.java:161)

        at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.handleRemoteLogSegmentMetadataUpdate(RemotePartitionMetadataStore.java:89)

        at 
org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataEventHandler.handleRemoteLogMetadata(RemotePartitionMetadataEventHandler.java:33)

        at 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.processConsumerRecord(ConsumerTask.java:157)

        at 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask.run(ConsumerTask.java:133)

        at java.base/java.lang.Thread.run(Thread.java:829)

 

After some debugging, realized the problem is there are 2 sets of 
DELETE_SEGMENT_STARTED/FINISHED pairs in the remote metadata topic for this 
segment.  And traced the log to where the log retention started and saw there 
are two delete log segment happened at that time:

 

[2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach based 
on the largest record timestamp in the segment 
(kafka.log.remote.RemoteLogManager$RLMTask)

[2023-10-10 23:32:05,929] INFO [RemoteLogManager=1043 
partition=QelVeVmER5CkjrzIiF07PQ:tier5-0] About to delete remote log segment 
RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0, 
id={*}YFNCaWjPQFSKCngQ1QcKpA{*}} due to retention time 60480ms breach based 
on the largest record timestamp in the segment 
(kafka.log.remote.RemoteLogManager$RLMTask)

 

And dumped out the content of the original COPY_SEGMENT_STARTED for this 
segment (which triggers the generation of the later DELETE_SEGMENT metadata):

[2023-10-16 22:19:10,894] DEBUG Adding to in-progress state: 
[RemoteLogSegmentMetadata\{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=QelVeVmER5CkjrzIiF07PQ:tier5-0,
 id=YFNCaWjPQFSKCngQ1QcKpA}, startOffset=6387830, endOffset=9578660, 
brokerId=1043, maxTimestampMs=1696401123036, eventTimestampMs=1696401127062, 
segmentLeaderEpochs=\{5=6387830, 6=6721329}, segmentSizeInBytes=511987531, 
customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}] 
(org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataCache)

 

You can see there are 2 leader epochs in this segment: 
segmentLeaderEpochs=\{5=6387830, 6=6721329}

>From the remote log retention code 
>([https://github.com/apache/kafka/blob/3.6.0/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L987)]

It's bucketing segments into epochs first and then looping through epochs.

I am not sure whether it should generate one or two DELETE_SEGMENT for this 
COPY_SEGMENT_START segment.  If it needs to generate 2 DELETE_SEGMENT metadata, 
the consumer task needs to handle that duplicate metadata situation (not 
throwing exceptions in the log).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-12295) Shallow Mirroring

2021-02-04 Thread Henry Cai (Jira)
Henry Cai created KAFKA-12295:
-

 Summary: Shallow Mirroring
 Key: KAFKA-12295
 URL: https://issues.apache.org/jira/browse/KAFKA-12295
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, core, mirrormaker, producer 
Reporter: Henry Cai
Assignee: Henry Cai
 Fix For: 2.8.0


KIP-712: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8089) High level consumer from MirrorMaker is slow to deal with SSL certification expiration

2019-03-11 Thread Henry Cai (JIRA)
Henry Cai created KAFKA-8089:


 Summary: High level consumer from MirrorMaker is slow to deal with 
SSL certification expiration
 Key: KAFKA-8089
 URL: https://issues.apache.org/jira/browse/KAFKA-8089
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 2.0.0
Reporter: Henry Cai


We have been using Kafka 2.0's mirror maker (which used High level consumer) to 
do replication.  The topic is SSL enabled and the certificate will expire at a 
random time within 12 hours.  When the certificate expired we will see many SSL 
related exception in the log
 
[2019-03-07 18:02:54,128] ERROR [Consumer 
clientId=kafkamirror-euw1-use1-m10nkafka03-1, 
groupId=kafkamirror-euw1-use1-m10nkafka03] Connection to node 3005 failed 
authentication due to: SSL handshake failed 
(org.apache.kafka.clients.NetworkClient)

This error will repeat for several hours.

However even with the SSL error, the preexisting socket connection will still 
work so the main fetching activities is actually not affected, but the metadata 
operations from the client and the heartbeats from heartbeat thread will be 
affected since they might open new socket connections.  I think those errors 
are most likely originated from those side activities.

The situation will last several hours until the main fetcher thread tried to 
open a new connection (usually due to consumer rebalance) and then the SSL 
Authentication exception will abort the operation and mirror maker will exit.

During that several hours, the client wouldn't be able to get the latest 
metadata and heartbeats also falters (we see rebalancing triggered because of 
this).

In NetworkClient.processDisconnection(), when the above method prints the ERROR 
message, can it just throw the AuthenticationException up, this will kill the 
KafkaConsumer.poll(), and this will speedup the certificate recycle (in our 
case, we will restart the mirror maker with the new certificate)
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-30 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai resolved KAFKA-3904.
--
Resolution: Duplicate

Dup of KAFKA-3619

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-30 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358038#comment-15358038
 ] 

Henry Cai commented on KAFKA-3904:
--

This is a dup of KAFKA-3619

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352387#comment-15352387
 ] 

Henry Cai commented on KAFKA-3904:
--

Please take a look at this PR for the fix:

https://github.com/apache/kafka/pull/1563

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-27 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352126#comment-15352126
 ] 

Henry Cai commented on KAFKA-3904:
--

I took a look at FileChannel.open, looks like it will still create a file 
descriptor for that channel, so the underlying problem of creating too many 
file descriptors are still there.

I am not hundred percent sure we can use this new FileChannel.open() since it 
relies on underlying FileSystemProvider.newFileChannel() and some of the 
implementations throws NotSupportedOperationException.

I think I will still use the traditional RandomAccessFile.getChannel and post a 
PR for this.


> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: architecture, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-26 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350018#comment-15350018
 ] 

Henry Cai commented on KAFKA-3904:
--

I have the fix also:

-FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(
+FileChannel channel = null;
+synchronized (channels) {
+channel = channels.get(lockFile);
+if (channel == null) {
+channel = new RandomAccessFile(lockFile, "rw").getChannel();
+channels.put(lockFile, channel);
+log.info("Creating new channel: {} for file: {}", channel, loc
+}
+}
 

> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: api, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-26 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3904:
-
Description: 
I noticed when my application was running long (> 1 day), I will get 'Too many 
open files' error.

I used 'lsof' to list all the file descriptors used by the process, it's over 
32K, but most of them belongs to the .lock file, e.g. this same lock file shows 
2700 times.

I looked at the code, I think the problem is in:

File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();

Each time new RandomAccessFile is called, a new fd will be created, we probably 
should either close or reuse this RandomAccessFile object.

lsof result:

java14799 hcai *740u   REG9,00 2415928585 
/mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock

java14799 hcai *743u   REG9,00 2415928585 
/mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock

java14799 hcai *746u   REG9,00 2415928585 
/mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock

java14799 hcai *755u   REG9,00 2415928585 
/mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock

hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc

   2709   24381  319662

  was:
Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or 
the default values are directly used. We need to make them configurable for 
advanced users. For example, some default values may not work perfectly for 
some scenarios: 
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
 

One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
to "StreamsConfig", which defines all related rocksDB options configs, that can 
be passed as key-value pairs to "StreamsConfig".


> File descriptor leaking (Too many open files) for long running stream process
> -
>
> Key: KAFKA-3904
> URL: https://issues.apache.org/jira/browse/KAFKA-3904
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: api, newbie
>
> I noticed when my application was running long (> 1 day), I will get 'Too 
> many open files' error.
> I used 'lsof' to list all the file descriptors used by the process, it's over 
> 32K, but most of them belongs to the .lock file, e.g. this same lock file 
> shows 2700 times.
> I looked at the code, I think the problem is in:
> File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
> FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
> Each time new RandomAccessFile is called, a new fd will be created, we 
> probably should either close or reuse this RandomAccessFile object.
> lsof result:
> java14799 hcai *740u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *743u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *746u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> java14799 hcai *755u   REG9,00 2415928585 
> /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock
> hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16  | wc
>2709   24381  319662



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process

2016-06-26 Thread Henry Cai (JIRA)
Henry Cai created KAFKA-3904:


 Summary: File descriptor leaking (Too many open files) for long 
running stream process
 Key: KAFKA-3904
 URL: https://issues.apache.org/jira/browse/KAFKA-3904
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Henry Cai
Assignee: Henry Cai


Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or 
the default values are directly used. We need to make them configurable for 
advanced users. For example, some default values may not work perfectly for 
some scenarios: 
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
 

One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
to "StreamsConfig", which defines all related rocksDB options configs, that can 
be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart

2016-06-21 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3890 started by Henry Cai.

> Kafka Streams: task assignment is not maintained on cluster restart or 
> rolling restart
> --
>
> Key: KAFKA-3890
> URL: https://issues.apache.org/jira/browse/KAFKA-3890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Currently the task assignment in TaskAssignor is not deterministic.  During 
> cluster restart or rolling restart, even though the participating worker 
> nodes are the same, but the TaskAssignor is not able to maintain a 
> deterministic mapping, so about 20% partitions will be reassigned which would 
> cause state repopulation on cluster restart time.
> When the participating worker nodes are not changed, we really just want to 
> keep the old task assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart

2016-06-21 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343209#comment-15343209
 ] 

Henry Cai commented on KAFKA-3890:
--

PR: https://github.com/apache/kafka/pull/1538

> Kafka Streams: task assignment is not maintained on cluster restart or 
> rolling restart
> --
>
> Key: KAFKA-3890
> URL: https://issues.apache.org/jira/browse/KAFKA-3890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Currently the task assignment in TaskAssignor is not deterministic.  During 
> cluster restart or rolling restart, even though the participating worker 
> nodes are the same, but the TaskAssignor is not able to maintain a 
> deterministic mapping, so about 20% partitions will be reassigned which would 
> cause state repopulation on cluster restart time.
> When the participating worker nodes are not changed, we really just want to 
> keep the old task assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart

2016-06-21 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3890:
-
Description: 
Currently the task assignment in TaskAssignor is not deterministic.  During 
cluster restart or rolling restart, even though the participating worker nodes 
are the same, but the TaskAssignor is not able to maintain a deterministic 
mapping, so about 20% partitions will be reassigned which would cause state 
repopulation on cluster restart time.

When the participating worker nodes are not changed, we really just want to 
keep the old task assignment.


  was:
Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or 
the default values are directly used. We need to make them configurable for 
advanced users. For example, some default values may not work perfectly for 
some scenarios: 
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
 

One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
to "StreamsConfig", which defines all related rocksDB options configs, that can 
be passed as key-value pairs to "StreamsConfig".


> Kafka Streams: task assignment is not maintained on cluster restart or 
> rolling restart
> --
>
> Key: KAFKA-3890
> URL: https://issues.apache.org/jira/browse/KAFKA-3890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Henry Cai
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Currently the task assignment in TaskAssignor is not deterministic.  During 
> cluster restart or rolling restart, even though the participating worker 
> nodes are the same, but the TaskAssignor is not able to maintain a 
> deterministic mapping, so about 20% partitions will be reassigned which would 
> cause state repopulation on cluster restart time.
> When the participating worker nodes are not changed, we really just want to 
> keep the old task assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3890) Kafka Streams: task assignment is not maintained on cluster restart or rolling restart

2016-06-21 Thread Henry Cai (JIRA)
Henry Cai created KAFKA-3890:


 Summary: Kafka Streams: task assignment is not maintained on 
cluster restart or rolling restart
 Key: KAFKA-3890
 URL: https://issues.apache.org/jira/browse/KAFKA-3890
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Henry Cai
Assignee: Henry Cai


Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or 
the default values are directly used. We need to make them configurable for 
advanced users. For example, some default values may not work perfectly for 
some scenarios: 
https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
 

One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
to "StreamsConfig", which defines all related rocksDB options configs, that can 
be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-06-21 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343115#comment-15343115
 ] 

Henry Cai commented on KAFKA-3740:
--

Yes, you or Roger can pick it up.

> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3059) ConsumerGroupCommand should allow resetting offsets for consumer groups

2016-06-15 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15332649#comment-15332649
 ] 

Henry Cai commented on KAFKA-3059:
--

We would like to have the capability to move the offset to a certain time in 
the past.  Doesn't have to be a precise point, move to the offset a little 
before that past time is fine, I think the ListOffsetRequest gives that 
capability.

> ConsumerGroupCommand should allow resetting offsets for consumer groups
> ---
>
> Key: KAFKA-3059
> URL: https://issues.apache.org/jira/browse/KAFKA-3059
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Jason Gustafson
>
> As discussed here:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201601.mbox/%3CCA%2BndhHpf3ib%3Ddsh9zvtfVjRiUjSz%2B%3D8umXm4myW%2BpBsbTYATAQ%40mail.gmail.com%3E
> * Given a consumer group, remove all stored offsets
> * Given a group and a topic, remove offset for group  and topic
> * Given a group, topic, partition and offset - set the offset for the 
> specified partition and group with the given value



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-05-26 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303428#comment-15303428
 ] 

Henry Cai commented on KAFKA-3740:
--

Looks like you want two sets of RocksDB settings, one for K/V store, one for 
range scan store.  I think for most of the small RocksDB (size < 5GB), those 
settings (target file size, or bloom filter) probably won't matter.

To make it really flexible, you would need to provide per store rocksdb 
settings.  E.g. for A join B, if A has small-size records and B has big-size 
records you would want to set the targeFileSize smaller for RocksDB A and 
targetFileSize bigger for RocksDB B.  It seems it would have the following 
overlay config structures in StreamConfig:

rocksdb.default.target_file_size_mb=16
rocksdb.default.use_bloom_filter=true
...
rocksdb.aggregation_style.target_file_size_mb=8
...
rocksdb.join_style.target_file_size_mb=32
...
rocksdb.custom1_style.target_file_size_mb=128

System understands 'default', 'aggregation_style', 'join_style' and will choose 
different style accordingly based on the context.

Application can optionally associate a RocksDB with customer style e.g. 
custom1_style (need some API enhancement).

This might get more complicated, I think for the initial version, as long as we 
can provide rocksdb.default.property this would be a big enhancement.


> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-05-20 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294310#comment-15294310
 ] 

Henry Cai commented on KAFKA-3740:
--

Worked now.

> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3740) Add configs for RocksDBStores

2016-05-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai reassigned KAFKA-3740:


Assignee: Henry Cai

> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Henry Cai
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-05-20 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294270#comment-15294270
 ] 

Henry Cai commented on KAFKA-3740:
--

Don't see that 'assign' button

> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores

2016-05-20 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15294204#comment-15294204
 ] 

Henry Cai commented on KAFKA-3740:
--

You can assign this one to me.

> Add configs for RocksDBStores
> -
>
> Key: KAFKA-3740
> URL: https://issues.apache.org/jira/browse/KAFKA-3740
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, newbie
>
> Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, 
> or the default values are directly used. We need to make them configurable 
> for advanced users. For example, some default values may not work perfectly 
> for some scenarios: 
> https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576
>  
> One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar 
> to "StreamsConfig", which defines all related rocksDB options configs, that 
> can be passed as key-value pairs to "StreamsConfig".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3185) Allow users to cleanup internal data

2016-05-02 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15267619#comment-15267619
 ] 

Henry Cai commented on KAFKA-3185:
--

Second to this.  It's very painful for us to test our demo program.  We have to 
either change our application id or manually remove rocksDB or kafka log files.

> Allow users to cleanup internal data
> 
>
> Key: KAFKA-3185
> URL: https://issues.apache.org/jira/browse/KAFKA-3185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Blocker
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Currently the internal data is managed completely by Kafka Streams framework 
> and users cannot clean them up actively. This results in a bad out-of-the-box 
> user experience especially for running demo programs since it results 
> internal data (changelog topics, RocksDB files, etc) that need to be cleaned 
> manually. It will be better to add a
> {code}
> KafkaStreams.cleanup()
> {code}
> function call to clean up these internal data programmatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3101) Optimize Aggregation Outputs

2016-04-20 Thread Henry Cai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250977#comment-15250977
 ] 

Henry Cai commented on KAFKA-3101:
--

We really need this batching/buffering feature for us to adopt kafka streams, 
otherwise the output rate from aggregation store is too high.  Any idea on when 
this will be implemented?

Another use case is similar to this, we have a left outer join case between two 
streams:

INSERT INTO C
SELECT a, b
FROM A
Left Outer Join B
on a.id = b.id

On the output stream, we might see (a, null) then followed by (a, b) which 
cancels the (a, null).  In order to reduce this kind of churn, we can have a 
policy of 15 minute buffer, if we don't see (a, b) within 15 minute then we 
emit (a, null).

Hopefully your solution for buffering aggregation output can solve the left 
outer join case as well.  The other workaround would be to delay the B stream 
by 15 minutes which also needs a buffering mechanism.


> Optimize Aggregation Outputs
> 
>
> Key: KAFKA-3101
> URL: https://issues.apache.org/jira/browse/KAFKA-3101
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>  Labels: architecture
> Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change oldValue>:
> , , , , 
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning: 
> Store: key => empty (no agg values yet)
> V1 computed: 
> Update Both in Store: key => (V1, V1), Emit 
> V2 computed: 
> Update NewValue in Store: key => (V2, V1), No Emit
> V3 computed: 
> Update NewValue in Store: key => (V3, V1), No Emit
> V4 computed: 
> Update Both in Store: key => (V4, V4), Emit 
> V5 computed: 
> Update NewValue in Store: key => (V5, V4), No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example  in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3595:
-
Description: 
Currently state store replication always go through a compact kafka topic. For 
some state stores, e.g. JoinWindow, there are no duplicates in the store, there 
is not much benefit using a compacted topic.
The problem of using compacted topic is the records can stay in kafka broker 
forever. In my use case, my key is ad_id, it's incrementing all the time, not 
bounded, I am worried the disk space on broker for that topic will go forever.
I think we either need the capability to purge the compacted records on broker, 
or allow us to specify different compact option for state store replication.

  was:
Currently in Kafka Streams, the way the windows are expired in RocksDB is 
triggered by new event insertion.  When a window is created at T0 with 10 
minutes retention, when we saw a new record coming with event timestamp T0 + 10 
+1, we will expire that window (remove it) out of RocksDB.

In the real world, it's very easy to see event coming with future timestamp (or 
out-of-order events coming with big time gaps between events), this way of 
retiring a window based on one event's event timestamp is dangerous.  I think 
at least we need to consider both the event's event time and server/stream time 
elapse.


> Add capability to specify replication compact option for stream store
> -
>
> Key: KAFKA-3595
> URL: https://issues.apache.org/jira/browse/KAFKA-3595
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Currently state store replication always go through a compact kafka topic. 
> For some state stores, e.g. JoinWindow, there are no duplicates in the store, 
> there is not much benefit using a compacted topic.
> The problem of using compacted topic is the records can stay in kafka broker 
> forever. In my use case, my key is ad_id, it's incrementing all the time, not 
> bounded, I am worried the disk space on broker for that topic will go forever.
> I think we either need the capability to purge the compacted records on 
> broker, or allow us to specify different compact option for state store 
> replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3596:
-
Description: 
Currently in Kafka Streams, the way the windows are expired in RocksDB is 
triggered by new event insertion.  When a window is created at T0 with 10 
minutes retention, when we saw a new record coming with event timestamp T0 + 10 
+1, we will expire that window (remove it) out of RocksDB.

In the real world, it's very easy to see event coming with future timestamp (or 
out-of-order events coming with big time gaps between events), this way of 
retiring a window based on one event's event timestamp is dangerous.  I think 
at least we need to consider both the event's event time and server/stream time 
elapse.

  was:
Currently state store replication always go through a compact kafka topic.  For 
some state stores, e.g. JoinWindow, there are no duplicates in the store, there 
is not much benefit using a compacted topic.

The problem of using compacted topic is the records can stay in kafka broker 
forever.  In my use case, my key is ad_id, it's incrementing all the time, not 
bounded, I am worried the disk space on broker for that topic will go forever.

I think we either need the capability to purge the compacted records on broker, 
or allow us to specify different compact option for state store replication.


> Kafka Streams: Window expiration needs to consider more than event time
> ---
>
> Key: KAFKA-3596
> URL: https://issues.apache.org/jira/browse/KAFKA-3596
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Currently in Kafka Streams, the way the windows are expired in RocksDB is 
> triggered by new event insertion.  When a window is created at T0 with 10 
> minutes retention, when we saw a new record coming with event timestamp T0 + 
> 10 +1, we will expire that window (remove it) out of RocksDB.
> In the real world, it's very easy to see event coming with future timestamp 
> (or out-of-order events coming with big time gaps between events), this way 
> of retiring a window based on one event's event timestamp is dangerous.  I 
> think at least we need to consider both the event's event time and 
> server/stream time elapse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3595:
-
Description: 
Currently in Kafka Streams, the way the windows are expired in RocksDB is 
triggered by new event insertion.  When a window is created at T0 with 10 
minutes retention, when we saw a new record coming with event timestamp T0 + 10 
+1, we will expire that window (remove it) out of RocksDB.

In the real world, it's very easy to see event coming with future timestamp (or 
out-of-order events coming with big time gaps between events), this way of 
retiring a window based on one event's event timestamp is dangerous.  I think 
at least we need to consider both the event's event time and server/stream time 
elapse.

  was:
Currently state store replication always go through a compact kafka topic.  For 
some state stores, e.g. JoinWindow, there are no duplicates in the store, there 
is not much benefit using a compacted topic.

The problem of using compacted topic is the records can stay in kafka broker 
forever.  In my use case, my key is ad_id, it's incrementing all the time, not 
bounded, I am worried the disk space on broker for that topic will go forever.

I think we either need the capability to purge the compacted records on broker, 
or allow us to specify different compact option for state store replication.


> Add capability to specify replication compact option for stream store
> -
>
> Key: KAFKA-3595
> URL: https://issues.apache.org/jira/browse/KAFKA-3595
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Currently in Kafka Streams, the way the windows are expired in RocksDB is 
> triggered by new event insertion.  When a window is created at T0 with 10 
> minutes retention, when we saw a new record coming with event timestamp T0 + 
> 10 +1, we will expire that window (remove it) out of RocksDB.
> In the real world, it's very easy to see event coming with future timestamp 
> (or out-of-order events coming with big time gaps between events), this way 
> of retiring a window based on one event's event timestamp is dangerous.  I 
> think at least we need to consider both the event's event time and 
> server/stream time elapse.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3596) Kafka Streams: Window expiration needs to consider more than event time

2016-04-20 Thread Henry Cai (JIRA)
Henry Cai created KAFKA-3596:


 Summary: Kafka Streams: Window expiration needs to consider more 
than event time
 Key: KAFKA-3596
 URL: https://issues.apache.org/jira/browse/KAFKA-3596
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Henry Cai
Assignee: Guozhang Wang
Priority: Minor


Currently state store replication always go through a compact kafka topic.  For 
some state stores, e.g. JoinWindow, there are no duplicates in the store, there 
is not much benefit using a compacted topic.

The problem of using compacted topic is the records can stay in kafka broker 
forever.  In my use case, my key is ad_id, it's incrementing all the time, not 
bounded, I am worried the disk space on broker for that topic will go forever.

I think we either need the capability to purge the compacted records on broker, 
or allow us to specify different compact option for state store replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3595:
-
Description: 
Currently state store replication always go through a compact kafka topic.  For 
some state stores, e.g. JoinWindow, there are no duplicates in the store, there 
is not much benefit using a compacted topic.

The problem of using compacted topic is the records can stay in kafka broker 
forever.  In my use case, my key is ad_id, it's incrementing all the time, not 
bounded, I am worried the disk space on broker for that topic will go forever.

I think we either need the capability to purge the compacted records on broker, 
or allow us to specify different compact option for state store replication.

  was:Add the ability to record metrics in the serializer/deserializer 
components. As it stands, I cannot record latency/sensor metrics since the API 
does not provide the context at the serde levels. Exposing the ProcessorContext 
at this level may not be the solution; but perhaps change the configure method 
to take a different config or init context and make the StreamMetrics available 
in that context along with config information.


> Add capability to specify replication compact option for stream store
> -
>
> Key: KAFKA-3595
> URL: https://issues.apache.org/jira/browse/KAFKA-3595
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Currently state store replication always go through a compact kafka topic.  
> For some state stores, e.g. JoinWindow, there are no duplicates in the store, 
> there is not much benefit using a compacted topic.
> The problem of using compacted topic is the records can stay in kafka broker 
> forever.  In my use case, my key is ad_id, it's incrementing all the time, 
> not bounded, I am worried the disk space on broker for that topic will go 
> forever.
> I think we either need the capability to purge the compacted records on 
> broker, or allow us to specify different compact option for state store 
> replication.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3595:
-
Affects Version/s: (was: 0.9.0.1)
   0.10.1.0

> Add capability to specify replication compact option for stream store
> -
>
> Key: KAFKA-3595
> URL: https://issues.apache.org/jira/browse/KAFKA-3595
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Add the ability to record metrics in the serializer/deserializer components. 
> As it stands, I cannot record latency/sensor metrics since the API does not 
> provide the context at the serde levels. Exposing the ProcessorContext at 
> this level may not be the solution; but perhaps change the configure method 
> to take a different config or init context and make the StreamMetrics 
> available in that context along with config information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henry Cai updated KAFKA-3595:
-
Issue Type: Improvement  (was: New Feature)

> Add capability to specify replication compact option for stream store
> -
>
> Key: KAFKA-3595
> URL: https://issues.apache.org/jira/browse/KAFKA-3595
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Henry Cai
>Assignee: Guozhang Wang
>Priority: Minor
>
> Add the ability to record metrics in the serializer/deserializer components. 
> As it stands, I cannot record latency/sensor metrics since the API does not 
> provide the context at the serde levels. Exposing the ProcessorContext at 
> this level may not be the solution; but perhaps change the configure method 
> to take a different config or init context and make the StreamMetrics 
> available in that context along with config information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3595) Add capability to specify replication compact option for stream store

2016-04-20 Thread Henry Cai (JIRA)
Henry Cai created KAFKA-3595:


 Summary: Add capability to specify replication compact option for 
stream store
 Key: KAFKA-3595
 URL: https://issues.apache.org/jira/browse/KAFKA-3595
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.9.0.1
Reporter: Henry Cai
Assignee: Guozhang Wang
Priority: Minor


Add the ability to record metrics in the serializer/deserializer components. As 
it stands, I cannot record latency/sensor metrics since the API does not 
provide the context at the serde levels. Exposing the ProcessorContext at this 
level may not be the solution; but perhaps change the configure method to take 
a different config or init context and make the StreamMetrics available in that 
context along with config information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)