[jira] [Updated] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
[ https://issues.apache.org/jira/browse/KAFKA-3904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henry Cai updated KAFKA-3904: - Description: I noticed when my application was running long (> 1 day), I will get 'Too many open files' error. I used 'lsof' to list all the file descriptors used by the process, it's over 32K, but most of them belongs to the .lock file, e.g. this same lock file shows 2700 times. I looked at the code, I think the problem is in: File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); Each time new RandomAccessFile is called, a new fd will be created, we probably should either close or reuse this RandomAccessFile object. lsof result: java14799 hcai *740u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *743u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *746u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock java14799 hcai *755u REG9,00 2415928585 /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc 2709 24381 319662 was: Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". > File descriptor leaking (Too many open files) for long running stream process > - > > Key: KAFKA-3904 > URL: https://issues.apache.org/jira/browse/KAFKA-3904 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Henry Cai >Assignee: Henry Cai > Labels: api, newbie > > I noticed when my application was running long (> 1 day), I will get 'Too > many open files' error. > I used 'lsof' to list all the file descriptors used by the process, it's over > 32K, but most of them belongs to the .lock file, e.g. this same lock file > shows 2700 times. > I looked at the code, I think the problem is in: > File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME); > FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); > Each time new RandomAccessFile is called, a new fd will be created, we > probably should either close or reuse this RandomAccessFile object. > lsof result: > java14799 hcai *740u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *743u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *746u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > java14799 hcai *755u REG9,00 2415928585 > /mnt/stream/join/rocksdb/ads-demo-30/0_16/.lock > hcai@teststream02001:~$ lsof -p 14799 | grep lock | grep 0_16 | wc >2709 24381 319662 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3904) File descriptor leaking (Too many open files) for long running stream process
Henry Cai created KAFKA-3904: Summary: File descriptor leaking (Too many open files) for long running stream process Key: KAFKA-3904 URL: https://issues.apache.org/jira/browse/KAFKA-3904 Project: Kafka Issue Type: Bug Components: streams Reporter: Henry Cai Assignee: Henry Cai Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, or the default values are directly used. We need to make them configurable for advanced users. For example, some default values may not work perfectly for some scenarios: https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar to "StreamsConfig", which defines all related rocksDB options configs, that can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KTable.filter usage, memory consumption and materialized view semantics
Thanks! You can follow this step-by-step guidance to contribute to Kafka via github. https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest Guozhang On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome wrote: > I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines > changes to existing unit test KTableFilterTest.testSendingOldValue. I > included those lines with context in the JIRA. I am struggling a bit with > github being new to it and how to do a proper pull request so hopefully > that can be followed up by you? I had the streams test suite pass aside for > a few cases that pertain specifically to this JIRA as assumptions have now > changed. > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang wrote: > > > Hi Philippe, > > > > Great, since you agree with my reasonings, I have created a JIRA ticket > for > > optimizing KTableFilter (feel free to pick it up if you are interested in > > contributing): > > > > https://issues.apache.org/jira/browse/KAFKA-3902 > > > > About case 3-c-1), what I meant is that since "predicate return true on > > both", > > the resulted pair would just be the same as the original pair. > > > > About KIP-63, itself is a rather big story, but it has one correspondence > > to this JIRA: with caching you can dedup some records with the same key, > > for example in the input records to the KTable is: > > > > , , , , , ... > > > > And the KTable is materialized into a state store with cache on top of > it, > > then the resulted downstream could be: > > > > 1}>, 6}> ... > > > > Instead of > > > > 1}>, 2}>, 3}>, ... 6}> ... > > > > So if it is piped to a filter() operator, then even less data will be > > produced. > > > > > > Guozhang > > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome > > wrote: > > > > > Yes, it looks very good. Your detailed explanation appears compelling > > > enough to reveal that some of the details of the complexity of a > streams > > > system are probably inherent complexity (not that I dared assume it was > > > "easy" but I could afford to be conveniently unaware). It took me 30 > > > minutes to grasp this latest response. > > > > > > There might be a typo in your email for case 3.c.1) as I would think we > > > should send the most recent pair as opposed to original, in any event > it > > > does not materially impact your presentation. > > > > > > Your case 3a) is really what triggered my line of questioning and I > found > > > the current behaviour vexing as it may lead to some undesirable and > > > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample > at > > > the very end trying to weed out null) used to output to topic to > console. > > > Without looking at design, it seemed self-evident to me that the 3a) > > > behaviour had to be implemented ( from my point of view with the code > > > example I was looking at, it simply means never say to delete a key > that > > > was never created, simply don't "create a deleted" key). > > > > > > Likewise cases 3 b,c look very reasonable. > > > > > > Just out of curiosity, did you effectively just restate the essence of > > > KIP-63 in a more approachable language I could understand or is KIP-63 > > > really a different beast? > > > > > > > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang > > wrote: > > > > > > > Hello Philippe, > > > > > > > > Very good points, let me dump my thoughts about "KTable.filter" > > > > specifically and how we can improve on that: > > > > > > > > 1. Some context: when a KTable participates in a downstream operators > > > (e.g. > > > > if that operator is an aggregation), then we need to materialize this > > > > KTable and send both its old value as well as new value as a pair > {old > > -> > > > > new} to the downstream operator. In practice it usually needs to send > > the > > > > pair. > > > > > > > > So let's discuss about them separately, take the following example > > source > > > > stream for your KTable > > > > > > > > , , ... > > > > > > > > When the KTable needs to be materialized, it will transform the > source > > > > messages into the pairs of: > > > > > > > > 1}>, 2}>, 3}> > > > > > > > > 2. If "send old value" is not enabled, then when the filter predicate > > > > returns false, we MUST send a to the downstream operator > to > > > > indicate that this key is being filtered in the table. Otherwise, for > > > > example if your filter is "value < 2", then the updated value > > will > > > > just be filtered, resulting in incorrect semantics. > > > > > > > > If it returns true we should still send the original to > > > > downstream operators. > > > > > > > > 3. If "send old value" is enabled, then there are a couple of cases > we > > > can > > > > consider: > > > > > > > > a. If old value is and new value is , > > and > > > > the filter predicate return false for the new value, then in this > case > > it > > > > is safe to optimize and not returnin
Re: KTable.filter usage, memory consumption and materialized view semantics
I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines changes to existing unit test KTableFilterTest.testSendingOldValue. I included those lines with context in the JIRA. I am struggling a bit with github being new to it and how to do a proper pull request so hopefully that can be followed up by you? I had the streams test suite pass aside for a few cases that pertain specifically to this JIRA as assumptions have now changed. On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang wrote: > Hi Philippe, > > Great, since you agree with my reasonings, I have created a JIRA ticket for > optimizing KTableFilter (feel free to pick it up if you are interested in > contributing): > > https://issues.apache.org/jira/browse/KAFKA-3902 > > About case 3-c-1), what I meant is that since "predicate return true on > both", > the resulted pair would just be the same as the original pair. > > About KIP-63, itself is a rather big story, but it has one correspondence > to this JIRA: with caching you can dedup some records with the same key, > for example in the input records to the KTable is: > > , , , , , ... > > And the KTable is materialized into a state store with cache on top of it, > then the resulted downstream could be: > > 1}>, 6}> ... > > Instead of > > 1}>, 2}>, 3}>, ... 6}> ... > > So if it is piped to a filter() operator, then even less data will be > produced. > > > Guozhang > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome > wrote: > > > Yes, it looks very good. Your detailed explanation appears compelling > > enough to reveal that some of the details of the complexity of a streams > > system are probably inherent complexity (not that I dared assume it was > > "easy" but I could afford to be conveniently unaware). It took me 30 > > minutes to grasp this latest response. > > > > There might be a typo in your email for case 3.c.1) as I would think we > > should send the most recent pair as opposed to original, in any event it > > does not materially impact your presentation. > > > > Your case 3a) is really what triggered my line of questioning and I found > > the current behaviour vexing as it may lead to some undesirable and > > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at > > the very end trying to weed out null) used to output to topic to console. > > Without looking at design, it seemed self-evident to me that the 3a) > > behaviour had to be implemented ( from my point of view with the code > > example I was looking at, it simply means never say to delete a key that > > was never created, simply don't "create a deleted" key). > > > > Likewise cases 3 b,c look very reasonable. > > > > Just out of curiosity, did you effectively just restate the essence of > > KIP-63 in a more approachable language I could understand or is KIP-63 > > really a different beast? > > > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang > wrote: > > > > > Hello Philippe, > > > > > > Very good points, let me dump my thoughts about "KTable.filter" > > > specifically and how we can improve on that: > > > > > > 1. Some context: when a KTable participates in a downstream operators > > (e.g. > > > if that operator is an aggregation), then we need to materialize this > > > KTable and send both its old value as well as new value as a pair {old > -> > > > new} to the downstream operator. In practice it usually needs to send > the > > > pair. > > > > > > So let's discuss about them separately, take the following example > source > > > stream for your KTable > > > > > > , , ... > > > > > > When the KTable needs to be materialized, it will transform the source > > > messages into the pairs of: > > > > > > 1}>, 2}>, 3}> > > > > > > 2. If "send old value" is not enabled, then when the filter predicate > > > returns false, we MUST send a to the downstream operator to > > > indicate that this key is being filtered in the table. Otherwise, for > > > example if your filter is "value < 2", then the updated value > will > > > just be filtered, resulting in incorrect semantics. > > > > > > If it returns true we should still send the original to > > > downstream operators. > > > > > > 3. If "send old value" is enabled, then there are a couple of cases we > > can > > > consider: > > > > > > a. If old value is and new value is , > and > > > the filter predicate return false for the new value, then in this case > it > > > is safe to optimize and not returning anything to the downstream > > operator, > > > since in this case we know there is no value for the key previously > > > anyways; otherwise we send the original pair. > > > > > > b. If old value is and new value is , > > > indicating to delete this key, and the filter predicate return false > for > > > the old value, then in this case it is safe to optimize and not > returning > > > anything to the downstream operator, since we know that the old value > has > > > already been filtered in a previous message; otherwise we se
[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic
[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349932#comment-15349932 ] Phil Derome commented on KAFKA-3902: one liner fix, but am unfamiliar with github and submitting proper pull request. Code is below, I used 10.0.0 as a base. Files: KTableFilter.java (1 liner), KTableFilterTest.java (4 lines changes or so), MockProcessorSupplier.java (new 1 liner method) KTableFilter.java private class KTableFilterProcessor extends AbstractProcessor> { @Override public void process(K key, Change change) { V newValue = computeValue(key, change.newValue); V oldValue = sendOldValues ? computeValue(key, change.oldValue) : null; if (sendOldValues && oldValue == null && newValue == null) return; // unnecessary to forward here. context().forward(key, new Change<>(newValue, oldValue)); } } KTableFilterTest.java @Test public void testSendingOldValue() throws IOException { KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; KTableImpl table1 = (KTableImpl) builder.table(stringSerde, intSerde, topic1); KTableImpl table2 = (KTableImpl) table1.filter( new Predicate() { @Override public boolean test(String key, Integer value) { return (value % 2) == 0; } }); table2.enableSendingOldValues(); MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); builder.addProcessor("proc1", proc1, table1.name); builder.addProcessor("proc2", proc2, table2.name); driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc2.checkEmpty(); // we got nothing since all inputs are odd or filtered out driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); // we are informed of 2 making it in for both A and B driver.process(topic1, "A", 3); proc1.checkAndClearProcessResult("A:(3<-2)"); proc2.checkAndClearProcessResult("A:(null<-2)"); // no change for B but A is deleted driver.process(topic1, "A", null); driver.process(topic1, "B", null); proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); proc2.checkAndClearProcessResult("B:(null<-2)"); // B is deleted from source Table1 } MockProcessorSupplier.java: public void checkEmpty() { assertEquals("the number of outputs:", 0, processed.size()); } > Optimize KTable.filter() to reduce unnecessary traffic > -- > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{, , ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{ 1\}>, 2\}>, 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is and new value is , and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value
[jira] [Issue Comment Deleted] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic
[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phil Derome updated KAFKA-3902: --- Comment: was deleted (was: Can we remove in KTableFilterTest testSendingOldValue references to proc2 as a consequence of this ticket? Specifically following ones (towards end of file): proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(null<-2)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); My tentative fix invalidates these ones.) > Optimize KTable.filter() to reduce unnecessary traffic > -- > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{, , ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{ 1\}>, 2\}>, 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is and new value is , and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is and new value is , > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #1554: MINOR: Follow-up from KAFKA-3842 with suggested fi...
GitHub user bbejeck opened a pull request: https://github.com/apache/kafka/pull/1554 MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating tem⦠â¦p directories, waitForCondition You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbejeck/kafka follow_up_for_KAFKA-3842 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1554.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1554 commit 48336f953a8f93a705aa756aa0b95ef356599ee0 Author: bbejeck Date: 2016-06-26T01:24:08Z MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating temp directories, waitForCondition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-3842) Add Helper Functions Into TestUtils
[ https://issues.apache.org/jira/browse/KAFKA-3842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349914#comment-15349914 ] ASF GitHub Bot commented on KAFKA-3842: --- GitHub user bbejeck opened a pull request: https://github.com/apache/kafka/pull/1554 MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating tem… …p directories, waitForCondition You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbejeck/kafka follow_up_for_KAFKA-3842 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1554.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1554 commit 48336f953a8f93a705aa756aa0b95ef356599ee0 Author: bbejeck Date: 2016-06-26T01:24:08Z MINOR: Follow-up from KAFKA-3842 with suggested fixes to creating temp directories, waitForCondition > Add Helper Functions Into TestUtils > --- > > Key: KAFKA-3842 > URL: https://issues.apache.org/jira/browse/KAFKA-3842 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.1.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 0.10.1.0 > > > Per guidance from [~guozhang] from PR #1477 move helper functions from > RegexSourceIntegrationTest (getProducerConfig, getConsumerConfig, > getStreamsConfig into TestUtils and parameterize as appropriate. Also look > into adding a {{waitUntil(Condition condition)}} type construct to wait for a > condition to be met without relying on using Thread.sleep -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3902) Optimize KTable.filter() to reduce unnecessary traffic
[ https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349872#comment-15349872 ] Phil Derome commented on KAFKA-3902: Can we remove in KTableFilterTest testSendingOldValue references to proc2 as a consequence of this ticket? Specifically following ones (towards end of file): proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(null<-2)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); My tentative fix invalidates these ones. > Optimize KTable.filter() to reduce unnecessary traffic > -- > > Key: KAFKA-3902 > URL: https://issues.apache.org/jira/browse/KAFKA-3902 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture, performance > > {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be > optimized to reduce unnecessary data traffic to downstream operators. More > specifically: > 1. Some context: when a KTable participates in a downstream operators (e.g. > if that operator is an aggregation), then we need to materialize this KTable > and send both its old value as well as new value as a pair {old -> new} to > the downstream operator. In practice it usually needs to send the pair. > So let's discuss about them separately, take the following example source > stream for your KTable > {{, , ...}} > When the KTable needs to be materialized, it will transform the source > messages into the pairs of: > {{ 1\}>, 2\}>, 3\}>}} > 2. If "send old value" is not enabled, then when the filter predicate returns > false, we MUST send a to the downstream operator to indicate that > this key is being filtered in the table. Otherwise, for example if your > filter is "value < 2", then the updated value will just be filtered, > resulting in incorrect semantics. > If it returns true we should still send the original to > downstream operators. > 3. If "send old value" is enabled, then there are a couple of cases we can > consider: > a. If old value is and new value is , and the > filter predicate return false for the new value, then in this case it is safe > to optimize and not returning anything to the downstream operator, since in > this case we know there is no value for the key previously anyways; otherwise > we send the original pair. > b. If old value is and new value is , > indicating to delete this key, and the filter predicate return false for the > old value, then in this case it is safe to optimize and not returning > anything to the downstream operator, since we know that the old value has > already been filtered in a previous message; otherwise we send the original > pair. > c. If both old and new values are not null, and: > 1) predicate return true on both, send the original pair; > 2) predicate return false on both, we can optimize and do not send > anything; > 3) predicate return true on old and false on new, send the key: \{old > -> null\}; > 4) predicate return false on old and true on new, send the key: > \{null -> new\}; -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3740) Add configs for RocksDBStores
[ https://issues.apache.org/jira/browse/KAFKA-3740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349772#comment-15349772 ] ASF GitHub Bot commented on KAFKA-3740: --- GitHub user HenryCaiHaiying opened a pull request: https://github.com/apache/kafka/pull/1553 KAFKA-3740: Add configs for RocksDBStore This is the part I of the work to add the StreamsConfig to ProcessorContext. We need to access StreamsConfig in the ProcessorContext so other components (e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from application) You can merge this pull request into a Git repository by running: $ git pull https://github.com/HenryCaiHaiying/kafka config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1553 commit 12973c2d090268db416417b18dfbbe3d96f6a9d5 Author: Henry Cai Date: 2016-06-25T20:19:42Z KAFKA-3740: Add configs for RocksDBStore This is the part I of the work to add the StreamsConfig to ProcessorContext. We need to access StreamsConfig in the ProcessorContext so other components (e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from application) > Add configs for RocksDBStores > - > > Key: KAFKA-3740 > URL: https://issues.apache.org/jira/browse/KAFKA-3740 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Henry Cai > Labels: api, newbie > > Today most of the rocksDB configs are hard written inside {{RocksDBStore}}, > or the default values are directly used. We need to make them configurable > for advanced users. For example, some default values may not work perfectly > for some scenarios: > https://github.com/HenryCaiHaiying/kafka/commit/ccc4e25b110cd33eea47b40a2f6bf17ba0924576 > > One way of doing that is to introduce a "RocksDBStoreConfigs" objects similar > to "StreamsConfig", which defines all related rocksDB options configs, that > can be passed as key-value pairs to "StreamsConfig". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] kafka pull request #1553: KAFKA-3740: Add configs for RocksDBStore
GitHub user HenryCaiHaiying opened a pull request: https://github.com/apache/kafka/pull/1553 KAFKA-3740: Add configs for RocksDBStore This is the part I of the work to add the StreamsConfig to ProcessorContext. We need to access StreamsConfig in the ProcessorContext so other components (e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from application) You can merge this pull request into a Git repository by running: $ git pull https://github.com/HenryCaiHaiying/kafka config Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1553 commit 12973c2d090268db416417b18dfbbe3d96f6a9d5 Author: Henry Cai Date: 2016-06-25T20:19:42Z KAFKA-3740: Add configs for RocksDBStore This is the part I of the work to add the StreamsConfig to ProcessorContext. We need to access StreamsConfig in the ProcessorContext so other components (e.g. RocksDBWindowStore or LRUCache can retrieve config parameter from application) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (KAFKA-3903) Convert tests to use static helper methods for Consumer/Producer/StreamsConfigs setup
Bill Bejeck created KAFKA-3903: -- Summary: Convert tests to use static helper methods for Consumer/Producer/StreamsConfigs setup Key: KAFKA-3903 URL: https://issues.apache.org/jira/browse/KAFKA-3903 Project: Kafka Issue Type: Improvement Components: streams Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 0.10.1.0 There are several unit/integration tests where we create Consumer/Producer/Streams configs. All of these calls essentially create the same configs over and over. We should migrate these config setups to use the static helper methods TestUtils.consumerConfigs, TestUtils.producerConfigs, StreamsTestUtils.getStreamsConfigs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: delay of producer and consumer in kafka 0.9 is too big to be accepted
Can you sanity check this with the end-to-end latency test that ships with Kafka in the tools package? https://apache.googlesource.com/kafka/+/1769642bb779921267bd57d3d338591dbdf33842/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala On Saturday, June 25, 2016, Kafka wrote: > Hi all, > my kafka cluster is composed of three brokers with each have 8core > cpu and 8g memory and 1g network card. > with java async client,I sent 100 messages with size of 1024 > bytes per message ,the send gap between each sending is 20us,the consumer’s > config is like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set > to 100. > to avoid the inconformity bewteen two machines,I start producer > and consumer at the same machine,the machine’s configurations have enough > resources to satisfy these two clients. > > I start consumer before producer on each test,with the sending > timestamp in each message,when consumer receive the message,then I can got > the consumer delay through the substraction between current timesstamp and > sending timestamp. > when I set acks to 0,replica to 2,then the average producer delay > is 2.98ms, the average consumer delay is 52.23ms. > when I set acks to 1,replica to 2,then the average producer delay > is 3.9ms,the average consumer delay is 44.88ms. > when I set acks to -1, replica to 2, then the average producer > delay is 1782ms, the average consumer delay is 1786ms. > > I have two doubts,the first is why my consumer's delay with acks > settled to 0 is logger than the consumer delay witch acks settled to 1. > the second is why the delay of producer and consumer is so big when I set > acks to -1,I think this delay is can not be accepted. > and I found this delay is amplified with sending more messages. > > any feedback is appreciated. > thanks > > > >
delay of producer and consumer in kafka 0.9 is too big to be accepted
Hi all, my kafka cluster is composed of three brokers with each have 8core cpu and 8g memory and 1g network card. with java async client,I sent 100 messages with size of 1024 bytes per message ,the send gap between each sending is 20us,the consumer’s config is like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set to 100. to avoid the inconformity bewteen two machines,I start producer and consumer at the same machine,the machine’s configurations have enough resources to satisfy these two clients. I start consumer before producer on each test,with the sending timestamp in each message,when consumer receive the message,then I can got the consumer delay through the substraction between current timesstamp and sending timestamp. when I set acks to 0,replica to 2,then the average producer delay is 2.98ms, the average consumer delay is 52.23ms. when I set acks to 1,replica to 2,then the average producer delay is 3.9ms,the average consumer delay is 44.88ms. when I set acks to -1, replica to 2, then the average producer delay is 1782ms, the average consumer delay is 1786ms. I have two doubts,the first is why my consumer's delay with acks settled to 0 is logger than the consumer delay witch acks settled to 1. the second is why the delay of producer and consumer is so big when I set acks to -1,I think this delay is can not be accepted. and I found this delay is amplified with sending more messages. any feedback is appreciated. thanks