Re: Operator order
You can do a conditional branching by using KStream.branch(Predicate...). You can then merge multiple streams using KStreamBuilder.merge(KStream...). -Yasuhiro On Mon, Jun 20, 2016 at 4:45 AM, Jeyhun Karimov wrote: > Hi Guozhang, > > Thank you for your reply. Yes, it is correct. Your solution is match for my > use case. I will try to use the topology you mentioned in a more dynamic > way. > > > Thanks > Jeyhun > > On Mon, Jun 20, 2016 at 1:59 AM Guozhang Wang wrote: > > > Hello Jeyhun, > > > > Another way to do this "dynamic routing" is to specify your topology > using > > the lower-level processor API: > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api > > > > More specifically, you can for example specify both A and D as parents > of E > > when adding processor E, and then in the processor A you can use the " > > forward(K key, V value, String childName)" to pass the record to a > specific > > child (either B or E) by its processor name. > > > > > > As for TelegraphCQ and its underlying query processor (i.e. the Eddy > model > > http://db.cs.berkeley.edu/papers/sigmod00-eddy.pdf), my understanding is > > that it is conceptually any-to-any routable and the query processor will > > try to schedule at a per-record granularity depending on the query > > selectivity, etc. But this is not fully controllable by the users. Is > that > > correct? > > > > > > Guozhang > > > > > > On Sun, Jun 19, 2016 at 7:16 AM, Matthias J. Sax > > wrote: > > > > > Thanks for clarification. Still don't have an better answer as before. > > > > > > How much overhead my suggestion gives is hard to predict. However, the > > > filter operators will run in the same thread (it's more or less just > > > another chained method call), thus, it should not be too large. > > > Furthermore, it should never the required to write tagged record to > > > Kafka -- thus, it would only be some main memory overhead. But you > would > > > need to test and measure. > > > > > > -Matthias > > > > > > On 06/18/2016 08:13 PM, Jeyhun Karimov wrote: > > > > Hi Matthias, > > > > > > > > Thank you for your answer. In my use-case, depending on statistics of > > > every > > > > operator, some tuples can be escaped for specific operators, so that > we > > > can > > > > get approximate but faster result. I think this is somehow similar to > > > > TelegraphCQ in dynamism of operators. > > > > In my case, the goal is getting rid of transmission and processing > > > overhead > > > > of some tuples for some operators (in runtime) to get approximate > > > results. > > > > However, it iseems the possible solution can bring extra overhead to > > > system > > > > in some cases. > > > > > > > > Jeyhun > > > > > > > > On Sat, Jun 18, 2016 at 7:36 PM Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > >> Hi Jeyhun, > > > >> > > > >> there is no support by the library itself. But you could build a > > custom > > > >> solution by building the DAG with all required edges (ie, additional > > > >> edges from A->E, and B->sink etc.). For this, each output message > > from A > > > >> would be duplicate and send to B and E. Therefore, A should "tag" > each > > > >> message with the designated receiver (B or E) and you add additional > > > >> filter step in both edges (ie, a filter between A->F1->B and > > A->F2->E), > > > >> that drop messages if the "tag" does not match the downstream > > operator. > > > >> > > > >> Does this makes sense? Of course, depending on your use case, you > > might > > > >> get a huge number of edges (plus filters) and your DAG might be > quite > > > >> complex. Don't see any other solution though. > > > >> > > > >> Hope this helps. > > > >> > > > >> One question though: how would changing the DAG at runtime would > help > > > >> you? Do you mean you would dynamically change the edge between A->B > > and > > > >> A->sink ? I guess, this would be a very special pattern and I doubt > > that > > > >> any library or system can offer this. > > > >> > > > >> -Matthias > > > >> > > > >> On 06/18/2016 05:33 PM, Jeyhun Karimov wrote: > > > >>> Hi community, > > > >>> > > > >>> Is there a way in Kafka Streams to change the order of operators in > > > >>> runtime? For example, I have operators > > > >>> > > > >>> Source->A->B->C->D->E->Sink > > > >>> > > > >>> and I want to forward some tuples from A to E, from B to Sink and > > etc. > > > As > > > >>> far as I know, the stream execution graph is computed in compile > time > > > and > > > >>> does not change in runtime. Can there be an indirect solution for > > this > > > >>> specific case? > > > >>> > > > >>> Jeyhun > > > >>> > > > >> > > > >> -- > > > > -Cheers > > > > > > > > Jeyhun > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- > -Cheers > > Jeyhun >
Re: KStream/KTable prioritization/initial offset
It may not be ideal, but there is a way to prioritize particular topics. It is to set the record timestamps to zero. This can be done by using a custom TimestampExtractor. Kafka Streams tries to synchronize multiple streams using the extracted timestamps. So, records with the timestamp 0 have greater chance to be processed earlier than others. On Thu, Mar 24, 2016 at 6:57 PM, Greg Fodor wrote: > Really digging Kafka Streams so far, nice work all. I'm interested in > being able to materialize one or more KTables in full before the rest > of the topology begins processing messages. This seems fundamentally > useful since it allows you to get your database tables replicated up > off the change stream topics from Connect before the stream processing > workload starts. > > In Samza we have bootstrap streams and stream prioritization to help > facilitate this. What seems desirable for Kafka Streams is: > > - Per-source prioritization (by defaulting to >0, setting the stream > priority to 0 effectively bootstraps it.) > - Per-source initial offset settings (earliest or latest, default to > latest) > > To solve the KTable materialization problem, you'd set priority to 0 > for its source and the source offset setting to earliest. > > Right now it appears the only control you have for re-processing is > AUTO_OFFSET_RESET_CONFIG, but I believe this is a global setting for > the consumers, and hence, the entire job. Beyond that, I don't see any > way to prioritize stream consumption at all, so your KTables will be > getting materialized while the general stream processing work is > running concurrently. > > I wanted to see if this case is actually supported already and I'm > missing something, or if not, if these options make sense. If this > seems reasonable and it's not too complicated, I could possibly try to > get together a patch. If so, any tips on implementing this would be > helpful as well. Thanks! > > -Greg >
[jira] [Created] (KAFKA-3395) prefix job id to internal topic names
Yasuhiro Matsuda created KAFKA-3395: --- Summary: prefix job id to internal topic names Key: KAFKA-3395 URL: https://issues.apache.org/jira/browse/KAFKA-3395 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.1 Reporter: Yasuhiro Matsuda Fix For: 0.10.0.0 Names of internal repartition topics are not prefixed by a job id right now. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3271) Notification upon unclean leader election
Yasuhiro Matsuda created KAFKA-3271: --- Summary: Notification upon unclean leader election Key: KAFKA-3271 URL: https://issues.apache.org/jira/browse/KAFKA-3271 Project: Kafka Issue Type: New Feature Components: clients, core Reporter: Yasuhiro Matsuda Priority: Minor It is a legitimate restriction that unclean leader election results in some message loss. That said, it is always good to try to minimize the message loss. A notification of unclean leader election can reduce message loss in the following scenario. 1. The latest offset is L. 2. A consumer is at C, where C < L 3. A slow broker (not in ISR) is at S, where S < C 4. All brokers in ISR die. 5. The slow broker becomes a leader by unclean leader election. 6. Now the offset of S. 7. The new messages get offsets S, S+1, S+2, and so on. Currently the consumer won't receive new messages of offsets between S and C. However, if the consumer is notified when unclean leader election happened and resets its offset to S, it can receive new messages between S and C. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3262) Make KafkaStreams debugging friendly
[ https://issues.apache.org/jira/browse/KAFKA-3262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-3262: Description: Current KafkaStreams polls records in the same thread as the data processing thread. This makes debugging user code, as well as KafkaStreams itself, difficult. When the thread is suspended by the debugger, the next heartbeat of the consumer tie to the thread won't be send until the thread is resumed. This often results in missed heartbeats and causes a group rebalance. So it may will be a completely different context then the thread hits the break point the next time. We should consider using separate threads for polling and processing. > Make KafkaStreams debugging friendly > > > Key: KAFKA-3262 > URL: https://issues.apache.org/jira/browse/KAFKA-3262 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.1.0 > Reporter: Yasuhiro Matsuda > > Current KafkaStreams polls records in the same thread as the data processing > thread. This makes debugging user code, as well as KafkaStreams itself, > difficult. When the thread is suspended by the debugger, the next heartbeat > of the consumer tie to the thread won't be send until the thread is resumed. > This often results in missed heartbeats and causes a group rebalance. So it > may will be a completely different context then the thread hits the break > point the next time. > We should consider using separate threads for polling and processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3262) Make KafkaStreams debugging friendly
Yasuhiro Matsuda created KAFKA-3262: --- Summary: Make KafkaStreams debugging friendly Key: KAFKA-3262 URL: https://issues.apache.org/jira/browse/KAFKA-3262 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.1.0 Reporter: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3245) need a way to specify the number of replicas for change log topics
Yasuhiro Matsuda created KAFKA-3245: --- Summary: need a way to specify the number of replicas for change log topics Key: KAFKA-3245 URL: https://issues.apache.org/jira/browse/KAFKA-3245 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.1.0 Reporter: Yasuhiro Matsuda Currently the number of replicas of auto-created change log topics is one. This make stream processing not fault tolerant. A way to specify the number of replicas in config is desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3153) Serializer/Deserializer Registration and Type inference
Yasuhiro Matsuda created KAFKA-3153: --- Summary: Serializer/Deserializer Registration and Type inference Key: KAFKA-3153 URL: https://issues.apache.org/jira/browse/KAFKA-3153 Project: Kafka Issue Type: Sub-task Components: kafka streams Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Fix For: 0.9.1.0 This changes the way serializer/deserializer are selected by the framework. The new scheme requires the app dev to register serializers/deserializers for types using API. The framework infers the type of data from topology and uses appropriate serializer/deserializer. This is best effort. Type inference is not always possible due to Java's type erasure. If a type cannot be determined, a user code can supply more information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3108) KStream custom StreamPartitioner for windowed key
[ https://issues.apache.org/jira/browse/KAFKA-3108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-3108: Priority: Minor (was: Major) > KStream custom StreamPartitioner for windowed key > - > > Key: KAFKA-3108 > URL: https://issues.apache.org/jira/browse/KAFKA-3108 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.1.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3108) KStream custom StreamPartitioner for windowed key
[ https://issues.apache.org/jira/browse/KAFKA-3108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-3108: Summary: KStream custom StreamPartitioner for windowed key (was: KStream custom Partitioner for windowed key) > KStream custom StreamPartitioner for windowed key > - > > Key: KAFKA-3108 > URL: https://issues.apache.org/jira/browse/KAFKA-3108 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.1.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3108) KStream custom Partitioner for windowed key
Yasuhiro Matsuda created KAFKA-3108: --- Summary: KStream custom Partitioner for windowed key Key: KAFKA-3108 URL: https://issues.apache.org/jira/browse/KAFKA-3108 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.1.0 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3060) Refactor MeteredXXStore
Yasuhiro Matsuda created KAFKA-3060: --- Summary: Refactor MeteredXXStore Key: KAFKA-3060 URL: https://issues.apache.org/jira/browse/KAFKA-3060 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.1 Reporter: Yasuhiro Matsuda Priority: Minor ** copied from a github comment by Guozhang Wang ** The original motivation of having the MeteredXXStore is to wrap all metrics / logging semantics into one place so they do not need to be re-implemented again, but this seems to be an obstacle with the current pattern now, for example MeteredWindowStore.putAndReturnInternalKey is only used for logging, and MeteredWindowStore.putInternal / MeteredWindowStore.getInternal are never used since only its inner will trigger this function. So how about refactoring this piece as follows: 1. WindowStore only expose two APIs: put(K, V) and get(K, long). 2. Add a RollingRocksDBStores that does not extend any interface, but only implements putInternal, getInternal and putAndReturnInternalKey that uses underlying RocksDBStore as Segments. 3. RocksDBWindowStore implements WindowStore with an RollingRocksDBStores inner. 4. Let MeteredXXStore only maintain the metrics recording logic, and let different stores implement their own logging logic, since this is now different across different types and are better handled separately. Also some types of stores may not even have a loggingEnabled flag, if it will always log, or will never log. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3016) Add KStream-KStream window joins
Yasuhiro Matsuda created KAFKA-3016: --- Summary: Add KStream-KStream window joins Key: KAFKA-3016 URL: https://issues.apache.org/jira/browse/KAFKA-3016 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.1 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2984) KTable should send old values along with new values to downstreams
[ https://issues.apache.org/jira/browse/KAFKA-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2984: Summary: KTable should send old values along with new values to downstreams (was: KTable should send old values to downstreams) > KTable should send old values along with new values to downstreams > -- > > Key: KAFKA-2984 > URL: https://issues.apache.org/jira/browse/KAFKA-2984 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.0.1 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > > Old values are necessary for implementing aggregate functions. KTable should > augment an event with its old value. Basically KTable stream is a stream of > (key, (new value, old value)) internally. The old value may be omitted when > it is not used in the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2984) KTable should send old values to downstreams
Yasuhiro Matsuda created KAFKA-2984: --- Summary: KTable should send old values to downstreams Key: KAFKA-2984 URL: https://issues.apache.org/jira/browse/KAFKA-2984 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.1 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Old values are necessary for implementing aggregate functions. KTable should augment an event with its old value. Basically KTable stream is a stream of (key, (new value, old value)) internally. The old value may be omitted when it is not used in the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2962) Add Simple Join API
[ https://issues.apache.org/jira/browse/KAFKA-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2962: Description: Stream-Table and Table-Table joins > Add Simple Join API > --- > > Key: KAFKA-2962 > URL: https://issues.apache.org/jira/browse/KAFKA-2962 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.1.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > > Stream-Table and Table-Table joins -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2962) Add Simple Join API
[ https://issues.apache.org/jira/browse/KAFKA-2962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2962: Summary: Add Simple Join API (was: Add Join API) > Add Simple Join API > --- > > Key: KAFKA-2962 > URL: https://issues.apache.org/jira/browse/KAFKA-2962 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.1.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2962) Add Join API
Yasuhiro Matsuda created KAFKA-2962: --- Summary: Add Join API Key: KAFKA-2962 URL: https://issues.apache.org/jira/browse/KAFKA-2962 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.1.0 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Possible StreamingConfig Bug
The group id is removed from the restore consumer config because the restore consumer should not participate in the specified consumer group. I don't know why it is failing. On Fri, Nov 27, 2015 at 12:37 PM, Guozhang Wang wrote: > Hello Bill, > > Thanks for reporting it, this is a valid issue, could you create a ticket? > > Guozhang > > On Fri, Nov 27, 2015 at 6:19 AM, Bill Bejeck wrote: > > > All, > > > > When starting KafkaStreaming I'm getting the following error (even when > > explicitly setting the groupId with props.put("group.id > > ","test-consumer-group") > > ); > > > > Exception in thread "StreamThread-1" > > org.apache.kafka.common.KafkaException: > > org.apache.kafka.common.errors.ApiException: The configured groupId is > > invalid > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309) > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198) > > Caused by: org.apache.kafka.common.errors.ApiException: The configured > > groupId is invalid > > > > I've traced the source of the issue to the > > StreamingConfig.getConsumerConfigs method as it calls > > getRestoreConsumerConfigs (which explicitly removes the groupId property) > > vs using getBaseConsumerConfigs which returns the passed in configs > > unaltered. > > > > When I switched the method call, KafkaStreaming starts up fine. > > > > If you agree with this change/fix, I'll create a Jira ticket and put in > the > > PR, yada yada yada.. > > > > Thanks, > > Bill > > > > > > -- > -- Guozhang >
[jira] [Created] (KAFKA-2856) add KTable
Yasuhiro Matsuda created KAFKA-2856: --- Summary: add KTable Key: KAFKA-2856 URL: https://issues.apache.org/jira/browse/KAFKA-2856 Project: Kafka Issue Type: Sub-task Components: kafka streams Reporter: Yasuhiro Matsuda KTable is a special type of the stream that represents a changelog of a database table (or a key-value store). A changelog has to meet the following requirements. * Key-value mapping is surjective in the database table (the key must be the primary key). * All insert/update/delete events are delivered in order for the same key * An update event has the whole data (not just delta). * A delete event is represented by the null value. KTable does not necessarily materialized as a local store. It may be materialized when necessary. (see below) KTable supports look-up by key. KTable is materialized implicitly when look-up is necessary. * KTable may be created from a topic. (Base KTable) * KTable may be created from another KTable by filter(), filterOut(), mapValues(). (Derived KTable) * A call to the user supplied function is skipped when the value is null since such an event represents a deletion. * Instead of dropping, events filtered out by filter() or filterOut() are converted to delete events. (Can we avoid this?) * map(), flatMap() and flatMapValues() are not supported since they may violate the changelog requirements A derived KTable may be persisted to a topic by to() or through(). through() creates another base KTable. KTable can be converted to KStream by the toStream() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2811) Add standby tasks
[ https://issues.apache.org/jira/browse/KAFKA-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda reassigned KAFKA-2811: --- Assignee: Yasuhiro Matsuda > Add standby tasks > - > > Key: KAFKA-2811 > URL: https://issues.apache.org/jira/browse/KAFKA-2811 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > > Restoring local state from state change-log topics can be expensive. To > alleviate this, we want to have an option to keep replications of local > states that are kept up to date. The task assignment logic should be aware of > existence of such replicas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2811) Add standby tasks
Yasuhiro Matsuda created KAFKA-2811: --- Summary: Add standby tasks Key: KAFKA-2811 URL: https://issues.apache.org/jira/browse/KAFKA-2811 Project: Kafka Issue Type: Sub-task Components: kafka streams Reporter: Yasuhiro Matsuda Restoring local state from state change-log topics can be expensive. To alleviate this, we want to have an option to keep replications of local states that are kept up to date. The task assignment logic should be aware of existence of such replicas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2763) Reduce stream task migrations and initialization costs
[ https://issues.apache.org/jira/browse/KAFKA-2763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda reassigned KAFKA-2763: --- Assignee: Yasuhiro Matsuda > Reduce stream task migrations and initialization costs > -- > > Key: KAFKA-2763 > URL: https://issues.apache.org/jira/browse/KAFKA-2763 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.0.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > > Stream task assignment is not aware of either the previous task assignment or > local states of participating clients. By making the assignment logic aware > of them, we can reduce task migrations and initialization cost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2763) Reduce stream task migrations and initialization costs
Yasuhiro Matsuda created KAFKA-2763: --- Summary: Reduce stream task migrations and initialization costs Key: KAFKA-2763 URL: https://issues.apache.org/jira/browse/KAFKA-2763 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.0 Reporter: Yasuhiro Matsuda Stream task assignment is not aware of either the previous task assignment or local states of participating clients. By making the assignment logic aware of them, we can reduce task migrations and initialization cost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2727) initialize only the part of the topology relevant to the task
[ https://issues.apache.org/jira/browse/KAFKA-2727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2727: Description: Currently each streaming task initializes the entire topology regardless of the assigned topic-partitions. This is wasteful especially when the topology has local state stores. All local state stores are restored from their change log topics even when are not actually used in the task execution. To fix this, the task initialization should be aware of the relevant subgraph of the topology and initializes only processors and state stores in the subgraph. > initialize only the part of the topology relevant to the task > - > > Key: KAFKA-2727 > URL: https://issues.apache.org/jira/browse/KAFKA-2727 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams >Affects Versions: 0.9.0.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > > Currently each streaming task initializes the entire topology regardless of > the assigned topic-partitions. This is wasteful especially when the topology > has local state stores. All local state stores are restored from their change > log topics even when are not actually used in the task execution. To fix > this, the task initialization should be aware of the relevant subgraph of the > topology and initializes only processors and state stores in the subgraph. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2727) initialize only the part of the topology relevant to the task
Yasuhiro Matsuda created KAFKA-2727: --- Summary: initialize only the part of the topology relevant to the task Key: KAFKA-2727 URL: https://issues.apache.org/jira/browse/KAFKA-2727 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.0 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2707) Make KStream processor names deterministic
[ https://issues.apache.org/jira/browse/KAFKA-2707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2707: Description: Currently KStream processor names are generated from AtomicInteger static member of KStreamImpl. It is incremented every time a new processor is created. The problem is the name depends on the usage history of its use in the same JVM, thus the corresponding processors may have different names in different processes. It makes it difficult to debug. > Make KStream processor names deterministic > -- > > Key: KAFKA-2707 > URL: https://issues.apache.org/jira/browse/KAFKA-2707 > Project: Kafka > Issue Type: Sub-task > Reporter: Yasuhiro Matsuda > > Currently KStream processor names are generated from AtomicInteger static > member of KStreamImpl. It is incremented every time a new processor is > created. The problem is the name depends on the usage history of its use in > the same JVM, thus the corresponding processors may have different names in > different processes. It makes it difficult to debug. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2707) Make KStream processor names deteministic
Yasuhiro Matsuda created KAFKA-2707: --- Summary: Make KStream processor names deteministic Key: KAFKA-2707 URL: https://issues.apache.org/jira/browse/KAFKA-2707 Project: Kafka Issue Type: Sub-task Reporter: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2707) Make KStream processor names deterministic
[ https://issues.apache.org/jira/browse/KAFKA-2707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2707: Summary: Make KStream processor names deterministic (was: Make KStream processor names deteministic) > Make KStream processor names deterministic > -- > > Key: KAFKA-2707 > URL: https://issues.apache.org/jira/browse/KAFKA-2707 > Project: Kafka > Issue Type: Sub-task > Reporter: Yasuhiro Matsuda > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2706) Make state stores first class citizens in the processor DAG
Yasuhiro Matsuda created KAFKA-2706: --- Summary: Make state stores first class citizens in the processor DAG Key: KAFKA-2706 URL: https://issues.apache.org/jira/browse/KAFKA-2706 Project: Kafka Issue Type: Sub-task Components: kafka streams Affects Versions: 0.9.0.0 Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2592) Stop Writing the Change-log in store.put() / delete() for Non-transactional Store
[ https://issues.apache.org/jira/browse/KAFKA-2592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda resolved KAFKA-2592. - Resolution: Won't Fix > Stop Writing the Change-log in store.put() / delete() for Non-transactional > Store > - > > Key: KAFKA-2592 > URL: https://issues.apache.org/jira/browse/KAFKA-2592 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang > Assignee: Yasuhiro Matsuda > Fix For: 0.9.0.0 > > > Today we keep a dirty threshold and try to send to change-log in store.put() > / delete() when the threshold has been exceeded. Doing this will largely > increase the likelihood of inconsistent state upon unclean shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2694) Make a task id be a composite id of a topic group id and a partition id
[ https://issues.apache.org/jira/browse/KAFKA-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2694: Summary: Make a task id be a composite id of a topic group id and a partition id (was: Make a task id be a composite id of a task group id and a partition id) > Make a task id be a composite id of a topic group id and a partition id > --- > > Key: KAFKA-2694 > URL: https://issues.apache.org/jira/browse/KAFKA-2694 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams > Reporter: Yasuhiro Matsuda > Fix For: 0.9.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2694) Make a task id be a composite id of a topic group id and a partition id
[ https://issues.apache.org/jira/browse/KAFKA-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda reassigned KAFKA-2694: --- Assignee: Yasuhiro Matsuda > Make a task id be a composite id of a topic group id and a partition id > --- > > Key: KAFKA-2694 > URL: https://issues.apache.org/jira/browse/KAFKA-2694 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > Fix For: 0.9.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2694) Make a task id be a composite id of a task group id and a partition id
[ https://issues.apache.org/jira/browse/KAFKA-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2694: Issue Type: Sub-task (was: Task) Parent: KAFKA-2590 > Make a task id be a composite id of a task group id and a partition id > -- > > Key: KAFKA-2694 > URL: https://issues.apache.org/jira/browse/KAFKA-2694 > Project: Kafka > Issue Type: Sub-task > Components: kafka streams > Reporter: Yasuhiro Matsuda > Fix For: 0.9.0.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2694) Make a task id be a composite id of a task group id and a partition id
Yasuhiro Matsuda created KAFKA-2694: --- Summary: Make a task id be a composite id of a task group id and a partition id Key: KAFKA-2694 URL: https://issues.apache.org/jira/browse/KAFKA-2694 Project: Kafka Issue Type: Task Components: kafka streams Reporter: Yasuhiro Matsuda Fix For: 0.9.0.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KIP-28 does not allow Processor to specify partition of output message
>The bottom line is that *different* topics will likely need to be partitioned differently. You can do it with the existing Partitioner interface. Centralizing the logic doesn't mean all topics must use the same partitioning scheme. On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauch wrote: > It absolutely is important that the partitioning logic for a single topic > be the same across an entire cluster. IOW, if a topology has a single sink, > then no matter where that topology is run in the cluster, it had better use > the same partitioning logic. I would argue that when the partitioning logic > varies from the default logic, it’s far better to encapsulate it within the > topology’s definition, and adding it to the sink is a very easy way to do > this (and very natural for the developer using Kafka Streams). > > However, centralizing the partitioning logic for all streams is certainly > not ideal, primarily because different topics will likely need to be > partitioned in different ways. This is especially true for stateful stream > processing, which depends on messages with the same key going to the same > processor instance that owns that keyed data. IOW, the partitioning logic > used by a producer is strongly informed by how the *downstream stateful > consumers* are organized/clustered. It gets far more complicated when > considering built-in topics used by offset management, state storage, and > metrics. > > The bottom line is that *different* topics will likely need to be > partitioned differently. > > On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda ( > yasuhiro.mats...@gmail.com) wrote: > > A partitioning scheme should be a cluster wide thing. Letting each sink > have a different partitioning scheme does not make sense to me. A > partitioning scheme is not specific to a stream job, each task or a sink. > I > think specifying it at sink level is more error prone. > > If a user wants to customize a partitioning scheme, he/she also want to > manage it at some central place, maybe a code repo, or a jar file. All > application must use the same logic, otherwise data will be messed up. > Thus, a single class representing all partitioning logic is not a bad > thing > at all. (The code organization wise, all logic does not necessarily in the > single class, of course.) > > > On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch wrote: > > > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > > PR with the proposed change. > > > > Thanks! > > > > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Thanks! > > > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch > wrote: > > Ok, cool. I agree we want something simple. I'll create an issue and > > create a pull request with a proposal. Look for it tomorrow. > > > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > > > I see your point. Yeah I think it is a good way to add a Partitioner > into > > addSink(...) but the Partitioner interface in producer is a bit > overkill: > > > > "partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] > > valueBytes, Cluster cluster)" > > > > whereas for us we only want to partition on (K key, V value). > > > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch > wrote: > > This overrides the partitioning logic for all topics, right? That means > I > > have to explicitly call the default partitioning logic for all topics > > except those that my Producer forwards. I’m guess the best way to do by > > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > > with multiple sinks in my topology, I have to put all of the > partitioning > > logic inside a single class. > > > > What would you think about adding an overloaded > TopologyBuilder.addSink(…) > > method that takes a Partitioner (or better yet a smaller functional > > interface). The resulting SinkProcessor could use that Partitioner > instance > > to set the partition number? That’d be super convenient for users, would > > keep the logic where it belongs (where the topology defines the sinks), > and > > best of all the implementations won't have to worry about any other > topics, > > such as those used by stores, metrics, or other sinks. > > > > Best regards, > > > > Randall > > > > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Hi Ran
Re: KIP-28 does not allow Processor to specify partition of output message
A partitioning scheme should be a cluster wide thing. Letting each sink have a different partitioning scheme does not make sense to me. A partitioning scheme is not specific to a stream job, each task or a sink. I think specifying it at sink level is more error prone. If a user wants to customize a partitioning scheme, he/she also want to manage it at some central place, maybe a code repo, or a jar file. All application must use the same logic, otherwise data will be messed up. Thus, a single class representing all partitioning logic is not a bad thing at all. (The code organization wise, all logic does not necessarily in the single class, of course.) On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch wrote: > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > PR with the proposed change. > > Thanks! > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Thanks! > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch wrote: > Ok, cool. I agree we want something simple. I'll create an issue and > create a pull request with a proposal. Look for it tomorrow. > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m wondering if leaving it out of the > > API was intentional. > > > > Thoughts? > > > > Best regards, > > > > Randall Hauch > > > > > > -- > -- Guozhang > > > > -- > -- Guozhang > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-27 - Conditional Publish
> > >> > > > wrote: > > > >> > > > > > > >> > > >> Tangent: I think we should complete the move of Produce / > Fetch > > > RPC > > > >> to > > > >> > > >> the client libraries before we add more revisions to this > > > protocol. > > > >> > > >> > > > >> > > >> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin > > > >> > > >> wrote: > > > >> > > >> > I missed yesterday's KIP hangout. I'm currently working on > > > another > > > >> > KIP > > > >> > > >> for > > > >> > > >> > enriched metadata of messages. Guozhang has already > created a > > > wiki > > > >> > > page > > > >> > > >> > before ( > > > >> > > >> > > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > > >> > > >> ). > > > >> > > >> > We plan to fill the relative offset to the offset field in > > the > > > >> batch > > > >> > > sent > > > >> > > >> > by producer to avoid broker side re-compression. The > message > > > >> offset > > > >> > > would > > > >> > > >> > become batch base offset + relative offset. I guess maybe > the > > > >> > expected > > > >> > > >> > offset in KIP-27 can be only set for base offset? Would > that > > > >> affect > > > >> > > >> certain > > > >> > > >> > use cases? > > > >> > > >> > > > > >> > > >> > For Jun's comments, I am not sure I completely get it. I > > think > > > the > > > >> > > >> producer > > > >> > > >> > only sends one batch per partition in a request. So either > > that > > > >> > batch > > > >> > > is > > > >> > > >> > appended or not. Why a batch would be partially committed? > > > >> > > >> > > > > >> > > >> > Thanks, > > > >> > > >> > > > > >> > > >> > Jiangjie (Becket) Qin > > > >> > > >> > > > > >> > > >> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin > > > wrote: > > > >> > > >> > > > > >> > > >> >> That's a fair point. I've added some imagined job logic to > > the > > > >> KIP, > > > >> > > so > > > >> > > >> >> we can make sure the proposal stays in sync with the > usages > > > we're > > > >> > > >> >> discussing. (The logic is just a quick sketch for now -- I > > > expect > > > >> > > I'll > > > >> > > >> >> need to elaborate it as we get into more detail, or to > > address > > > >> > other > > > >> > > >> >> concerns...) > > > >> > > >> >> > > > >> > > >> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao < > j...@confluent.io > > > > > > >> > wrote: > > > >> > > >> >> > For 1, yes, when there is a transient leader change, > it's > > > >> > > guaranteed > > > >> > > >> >> that a > > > >> > > >> >> > prefix of the messages in a request will be committed. > > > However, > > > >> > it > > > >> > > >> seems > > > >> > > >> >> > that the client needs to know what subset of messages > are > > > >> > > committed in > > > >> > > >> >> > order to resume the sending. Then the question is how. > > > >> > > >> >> > > > > >> > > >> >> > As Flavio indicated, for the use cases that you listed, > it > > > >>
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14640958#comment-14640958 ] Yasuhiro Matsuda commented on KAFKA-2350: - Throwing exceptions makes sense. In addition, I think a consumer should not keep pause/unpause states across rebalance. Forgetting the states makes the consumer/application logic cleaner. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-28 - Add a transform client for data processing
Jay, I understand that. Context can provide more information without breaking the compatibility if needed. Also I am not sure ConsumerRecord is the right abstraction of data for stream processing. After transformation or join, what is the topic and the offset? It is odd to use ConsumerRecord. We can define a new record class, say StreamRecord. Isn't it an unnecessary overhead if it is created for every transformation and join? On Fri, Jul 24, 2015 at 10:06 AM, Jay Kreps wrote: > To follow on to one of Yi's points about taking ConsumerRecord vs > topic/key/value. One thing we have found is that for user-facing APIs > considering future API evolution is really important. If you do > topic/key/value and then realize you need offset added you end up having to > break everyones code. This is the idea behind things like ProducerRecord > and ConsumerRecord is that you can add additional fields without breaking > existing code. Thought I'd point that out since we've made this mistake a > few times now. > > -Jay > > On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan wrote: > > > Hi, Guozhang, > > > > Thanks for starting this. I took a quick look and had the following > > thoughts to share: > > > > - In the proposed KafkaProcessor API, there is no interface like > Collector > > that allows users to send messages to. Why is that? Is the idea to > > initialize the producer once and re-use it in the processor? And if there > > are many KStreamThreads in the process, are there going to be many > > instances of KafkaProducer although all outputs are sending to the same > > Kafka cluster? > > > > - Won’t it be simpler if the process() API just takes in the > ConsumerRecord > > as the input instead of a tuple of (topic, key, value)? > > > > - Also, the input only indicates the topic of a message. What if the > stream > > task needs to consume and produce messages from/to multiple Kafka > clusters? > > To support that case, there should be a system/cluster name in both input > > and output as well. > > > > - How are the output messages handled? There does not seem to have an > > interface that allows user to send an output messages to multiple output > > Kafka clusters. > > > > - It seems the proposed model also assumes one thread per processor. What > > becomes thread-local and what are shared among processors? Is the > proposed > > model targeting to have the consumers/producers become thread-local > > instances within each KafkaProcessor? What’s the cost associated with > this > > model? > > > > - One more important issue: how do we plug-in client-side partition > > management logic? Considering about the use case where the stream task > > needs to consume from multiple Kafka clusters, I am not even sure that we > > can rely on Kafka broker to maintain the consumer group membership? Maybe > > we still can get the per cluster consumer group membership and > partitions. > > However, in this case, we truly need a client-side plugin partition > > management logic to determine how to assign partitions in different Kafka > > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and > > cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for > > processing). Based on the full information about (group members, all > topic > > partitions) in all Kafka clusters with input topics, there should be two > > levels of partition management policies: a) how to group all topic > > partitions in all Kafka clusters to processor groups (i.e. the same > concept > > as Task group in Samza); b) how to assign the processor groups to group > > members. Note if a processor group includes topic partitions from more > than > > one Kafka clusters, it has to be assigned to the common group members in > > all relevant Kafka clusters. This can not be done just by the brokers in > a > > single Kafka cluster. > > > > - It seems that the intention of this KIP is also trying to put SQL/DSL > > libraries into Kafka. Why is it? Shouldn't Kafka be more focused on > hiding > > system-level integration details and leave it open for any additional > > modules outside the Kafka core to enrich the functionality that are > > user-facing? > > > > Just a few quick cents. Thanks a lot! > > > > -Yi > > > > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede > wrote: > > > > > Ewen: > > > > > > * I think trivial filtering and aggregation on a single stream usually > > work > > > > fine with this model. > > > > > > > > > The way I see this, the process() API is an abstraction for > > > message-at-a-time computations. In the future, you could imagine > > providing > > > a simple DSL layer on top of the process() API that provides a set of > > APIs > > > for stream processing operations on sets of messages like joins, > windows > > > and various aggregations. > > > > > > * Spark (and presumably > > > > spark streaming) is supposed to get a big win by handling shuffles > such > > > > that the data just stays in cache and never actually hits disk, or at > > > l
Re: [DISCUSS] KIP-28 - Add a transform client for data processing
The goal of this KIP is to provide a lightweight/embeddable streaming framework, and allows Kafka users to start using stream processing easily. DSL is not covered in this KIP. But, DSL is a very attractive option to have. > In the proposed KafkaProcessor API, there is no interface like Collector that allows users to send messages to. Why is that? It is not stated in the KIP, but Context provides a simple interface to a producer. >Won’t it be simpler if the process() API just takes in the ConsumerRecord as the input instead of a tuple of (topic, key, value)? If Kafka implement a simple DSL something like the one in Spark, I think ConsumerRecord may not be the most convenient thing for the framework or the most intuitive thing for users. I don't think we need "topic" in the arguments. Think about a most simple application, all it needs is a key and a value. That makes the API simpler. If the application needs to access more info (topic, offset), Context should provide them. On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan wrote: > Hi, Guozhang, > > Thanks for starting this. I took a quick look and had the following > thoughts to share: > > - In the proposed KafkaProcessor API, there is no interface like Collector > that allows users to send messages to. Why is that? Is the idea to > initialize the producer once and re-use it in the processor? And if there > are many KStreamThreads in the process, are there going to be many > instances of KafkaProducer although all outputs are sending to the same > Kafka cluster? > > - Won’t it be simpler if the process() API just takes in the ConsumerRecord > as the input instead of a tuple of (topic, key, value)? > > - Also, the input only indicates the topic of a message. What if the stream > task needs to consume and produce messages from/to multiple Kafka clusters? > To support that case, there should be a system/cluster name in both input > and output as well. > > - How are the output messages handled? There does not seem to have an > interface that allows user to send an output messages to multiple output > Kafka clusters. > > - It seems the proposed model also assumes one thread per processor. What > becomes thread-local and what are shared among processors? Is the proposed > model targeting to have the consumers/producers become thread-local > instances within each KafkaProcessor? What’s the cost associated with this > model? > > - One more important issue: how do we plug-in client-side partition > management logic? Considering about the use case where the stream task > needs to consume from multiple Kafka clusters, I am not even sure that we > can rely on Kafka broker to maintain the consumer group membership? Maybe > we still can get the per cluster consumer group membership and partitions. > However, in this case, we truly need a client-side plugin partition > management logic to determine how to assign partitions in different Kafka > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and > cluster2.topic2.p1 has to be assigned together to one KafkaProcessor for > processing). Based on the full information about (group members, all topic > partitions) in all Kafka clusters with input topics, there should be two > levels of partition management policies: a) how to group all topic > partitions in all Kafka clusters to processor groups (i.e. the same concept > as Task group in Samza); b) how to assign the processor groups to group > members. Note if a processor group includes topic partitions from more than > one Kafka clusters, it has to be assigned to the common group members in > all relevant Kafka clusters. This can not be done just by the brokers in a > single Kafka cluster. > > - It seems that the intention of this KIP is also trying to put SQL/DSL > libraries into Kafka. Why is it? Shouldn't Kafka be more focused on hiding > system-level integration details and leave it open for any additional > modules outside the Kafka core to enrich the functionality that are > user-facing? > > Just a few quick cents. Thanks a lot! > > -Yi > > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede wrote: > > > Ewen: > > > > * I think trivial filtering and aggregation on a single stream usually > work > > > fine with this model. > > > > > > The way I see this, the process() API is an abstraction for > > message-at-a-time computations. In the future, you could imagine > providing > > a simple DSL layer on top of the process() API that provides a set of > APIs > > for stream processing operations on sets of messages like joins, windows > > and various aggregations. > > > > * Spark (and presumably > > > spark streaming) is supposed to get a big win by handling shuffles such > > > that the data just stays in cache and never actually hits disk, or at > > least > > > hits disk in the background. Will we take a hit because we always write > > to > > > Kafka? > > > > > > The goal isn't so much about forcing materialization of intermediate > > results into Kafka but designing th
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639460#comment-14639460 ] Yasuhiro Matsuda commented on KAFKA-2350: - Suppose we are using auto assignment. {code} subscribe(topic) // A unsubscribe(partition) // B subscribe(partition) // C {code} If rebalancing happens and the co-ordinator assigns the partition to a different instance, is the client still subscribe to the partition? Rebalance can happen 1) between A and B, 2) between B and C, or 3) after C. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637219#comment-14637219 ] Yasuhiro Matsuda commented on KAFKA-2350: - I think overloading subscribe/unsubscribe is very confusing. Subscribe/unsubscribe and pause/unpause are two very different behaviors. And overloading same method names is not really simplifying the API. I want pause/unpause to be a pure flow control. It shouldn't be mix up with subscription. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-27 - Conditional Publish
e offsets are > > already available -- like the commit-log-for-KV-store example -- but > > in general, being able to get the offsets from the producer interface > > does sound convenient. > > > > > We plan to have a KIP discussion meeting tomorrow at 11am PST. Perhaps > > you > > > can describe this KIP a bit then? > > > > Sure, happy to join. > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin wrote: > > > > > >> Just wanted to flag a little discussion that happened on the ticket: > > >> > > >> > > > https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 > > >> > > >> In particular, Yasuhiro Matsuda proposed an interesting variant on > > >> this that performs the offset check on the message key (instead of > > >> just the partition), with bounded space requirements, at the cost of > > >> potentially some spurious failures. (ie. the produce request may fail > > >> even if that particular key hasn't been updated recently.) This > > >> addresses a couple of the drawbacks of the per-key approach mentioned > > >> at the bottom of the KIP. > > >> > > >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin wrote: > > >> > Hi all, > > >> > > > >> > So, perhaps it's worth adding a couple specific examples of where > this > > >> > feature is useful, to make this a bit more concrete: > > >> > > > >> > - Suppose I'm using Kafka as a commit log for a partitioned KV > store, > > >> > like Samza or Pistachio (?) do. We bootstrap the process state by > > >> > reading from that partition, and log all state updates to that > > >> > partition when we're running. Now imagine that one of my processes > > >> > locks up -- GC or similar -- and the system transitions that > partition > > >> > over to another node. When the GC is finished, the old 'owner' of > that > > >> > partition might still be trying to write to the commit log at the > same > > >> > as the new one is. A process might detect this by noticing that the > > >> > offset of the published message is bigger than it thought the > upcoming > > >> > offset was, which implies someone else has been writing to the > log... > > >> > but by then it's too late, and the commit log is already corrupt. > With > > >> > a 'conditional produce', one of those processes will have it's > publish > > >> > request refused -- so we've avoided corrupting the state. > > >> > > > >> > - Envision some copycat-like system, where we have some sharded > > >> > postgres setup and we're tailing each shard into its own partition. > > >> > Normally, it's fairly easy to avoid duplicates here: we can track > > >> > which offset in the WAL corresponds to which offset in Kafka, and we > > >> > know how many messages we've written to Kafka already, so the state > is > > >> > very simple. However, it is possible that for a moment -- due to > > >> > rebalancing or operator error or some other thing -- two different > > >> > nodes are tailing the same postgres shard at once! Normally this > would > > >> > introduce duplicate messages, but by specifying the expected offset, > > >> > we can avoid this. > > >> > > > >> > So perhaps it's better to say that this is useful when a single > > >> > producer is *expected*, but multiple producers are *possible*? (In > the > > >> > same way that the high-level consumer normally has 1 consumer in a > > >> > group reading from a partition, but there are small windows where > more > > >> > than one might be reading at the same time.) This is also the spirit > > >> > of the 'runtime cost' comment -- in the common case, where there is > > >> > little to no contention, there's no performance overhead either. I > > >> > mentioned this a little in the Motivation section -- maybe I should > > >> > flesh that out a little bit? > > >> > > > >> > For me, the motivation to work this up was that I kept
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634322#comment-14634322 ] Yasuhiro Matsuda commented on KAFKA-2350: - To me the state preservation is not a requirement. The application code should track pausing states. I feel that a consumer should reset pausing state after a rebalance. > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability
[ https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634229#comment-14634229 ] Yasuhiro Matsuda commented on KAFKA-2350: - Can we have TopicPartition rather than String for finer control? {code} void pause(TopicPartition... partitions) void unpause(TopicPartition... partitions) {code} > Add KafkaConsumer pause capability > -- > > Key: KAFKA-2350 > URL: https://issues.apache.org/jira/browse/KAFKA-2350 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson > > There are some use cases in stream processing where it is helpful to be able > to pause consumption of a topic. For example, when joining two topics, you > may need to delay processing of one topic while you wait for the consumer of > the other topic to catch up. The new consumer currently doesn't provide a > nice way to do this. If you skip poll() or if you unsubscribe, then a > rebalance will be triggered and your partitions will be reassigned. > One way to achieve this would be to add two new methods to KafkaConsumer: > {code} > void pause(String... topics); > void unpause(String... topics); > {code} > When a topic is paused, a call to KafkaConsumer.poll will not initiate any > new fetches for that topic. After it is unpaused, fetches will begin again. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14632259#comment-14632259 ] Yasuhiro Matsuda edited comment on KAFKA-2260 at 7/18/15 4:16 AM: -- Here is the outline of the variant Jay mentioned. - A broker holds a fixed size array of offsets for each partition. Array indexes are hash of keys. In a sense, an array element works as a sub-partition. Sub-partitions do not hold data (messages). All they have are the high water marks. - The broker maintains high water marks for each sub-partition. A sub-partition high water mark is updated when a message whose key belongs to the sub-partition is appended to the log. - An application maintains the high water mark of each partition (not sub-partition!) as it consumes messages. It doesn't need to know anything about sub-partitions in a broker. A produce request is processed as follows. 1. The producer sends the known high water mark of the partition with a message. 2. The broker compares the high water mark in the produce request and the high water mark of the sub-partition corresponding the message key. 3. If the former is greater than or equal to the latter, the broker accepts the produce request. (Note that this is not equality test!) 4. Otherwise, the broker rejects the request. A nice thing about this is that it is easy to increase the concurrency without re-partitioning, and its overhead is predictable. When changing the number of sub-partitions, the broker doesn't have to recompute sub-partition high water marks. It can initialize all array elements with the partition's high water mark. was (Author: yasuhiro.matsuda): Here is the outline of the variant Jay mentioned. - A broker holds a fixed size array of offsets for each partition. Array indexes are hash of keys. In a sense, an array element works as a sub-partition. Sub-partitions do not hold data (messages). All they have are the high water marks. - The broker maintains high water marks for each sub-partition. A sub-partition high water mark is updated when a message whose key belongs to the sub-partition is appended to the log. - An application maintains the high water mark of each partition (not sub-partition!) as it consumes messages. It doesn't need to know anything about sub-partitions in a broker. A produce request is processed as follows. 1. The producer sends the known high water mark of the partition with a message. 2. The broker compares the high water mark in the produce request and the high water mark of the sub-partition corresponding the message key. 3. If the former is greater than the latter, the broker accepts the produce request. (Note that this is not equality test!) 4. Otherwise, the broker rejects the request. A nice thing about this is that it is easy to increase the concurrency without re-partitioning, and its overhead is predictable. When changing the number of sub-partitions, the broker doesn't have to recompute sub-partition high water marks. It can initialize all array elements with the partition's high water mark. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitel
[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14632259#comment-14632259 ] Yasuhiro Matsuda commented on KAFKA-2260: - Here is the outline of the variant Jay mentioned. - A broker holds a fixed size array of offsets for each partition. Array indexes are hash of keys. In a sense, an array element works as a sub-partition. Sub-partitions do not hold data (messages). All they have are the high water marks. - The broker maintains high water marks for each sub-partition. A sub-partition high water mark is updated when a message whose key belongs to the sub-partition is appended to the log. - An application maintains the high water mark of each partition (not sub-partition!) as it consumes messages. It doesn't need to know anything about sub-partitions in a broker. A produce request is processed as follows. 1. The producer sends the known high water mark of the partition with a message. 2. The broker compares the high water mark in the produce request and the high water mark of the sub-partition corresponding the message key. 3. If the former is greater than the latter, the broker accepts the produce request. (Note that this is not equality test!) 4. Otherwise, the broker rejects the request. A nice thing about this is that it is easy to increase the concurrency without re-partitioning, and its overhead is predictable. When changing the number of sub-partitions, the broker doesn't have to recompute sub-partition high water marks. It can initialize all array elements with the partition's high water mark. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Attachment: KAFKA-2226_2015-05-29_15:10:24.patch > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch, KAFKA-2226_2015-05-29_15:04:35.patch, > KAFKA-2226_2015-05-29_15:10:24.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565538#comment-14565538 ] Yasuhiro Matsuda commented on KAFKA-2226: - Updated reviewboard https://reviews.apache.org/r/34734/diff/ against branch origin/trunk > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch, KAFKA-2226_2015-05-29_15:04:35.patch, > KAFKA-2226_2015-05-29_15:10:24.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/ --- (Updated May 29, 2015, 10:10 p.m.) Review request for kafka. Bugs: KAFKA-2226 https://issues.apache.org/jira/browse/KAFKA-2226 Repository: kafka Description --- fix a race condition in TimerTaskEntry.remove Diffs (updated) - core/src/main/scala/kafka/utils/timer/Timer.scala b8cde820a770a4e894804f1c268b24b529940650 core/src/main/scala/kafka/utils/timer/TimerTask.scala 3407138115d579339ffb6b00e32e38c984ac5d6e core/src/main/scala/kafka/utils/timer/TimerTaskList.scala e7a96570ddc2367583d6d5590628db7e7f6ba39b core/src/main/scala/kafka/utils/timer/TimingWheel.scala e92aba3844dbf3372182e14536a5d98cf3366d73 Diff: https://reviews.apache.org/r/34734/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 34734: Patch for KAFKA-2226
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/ --- (Updated May 29, 2015, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-2226 https://issues.apache.org/jira/browse/KAFKA-2226 Repository: kafka Description --- fix a race condition in TimerTaskEntry.remove Diffs (updated) - core/src/main/scala/kafka/utils/timer/Timer.scala b8cde820a770a4e894804f1c268b24b529940650 core/src/main/scala/kafka/utils/timer/TimerTask.scala 3407138115d579339ffb6b00e32e38c984ac5d6e core/src/main/scala/kafka/utils/timer/TimerTaskList.scala e7a96570ddc2367583d6d5590628db7e7f6ba39b core/src/main/scala/kafka/utils/timer/TimingWheel.scala e92aba3844dbf3372182e14536a5d98cf3366d73 Diff: https://reviews.apache.org/r/34734/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Attachment: KAFKA-2226_2015-05-29_15:04:35.patch > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch, KAFKA-2226_2015-05-29_15:04:35.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565522#comment-14565522 ] Yasuhiro Matsuda commented on KAFKA-2226: - Updated reviewboard https://reviews.apache.org/r/34734/diff/ against branch origin/trunk > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch, KAFKA-2226_2015-05-29_15:04:35.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
> On May 29, 2015, 7:08 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/Timer.scala, line 54 > > <https://reviews.apache.org/r/34734/diff/3/?file=974375#file974375line54> > > > > canceled => cancelled I will fix it. By the way, "canceled" is a legitimate spelling in American English. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85764 --- On May 29, 2015, 5:49 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 29, 2015, 5:49 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/Timer.scala > b8cde820a770a4e894804f1c268b24b529940650 > core/src/main/scala/kafka/utils/timer/TimerTask.scala > 3407138115d579339ffb6b00e32e38c984ac5d6e > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > core/src/main/scala/kafka/utils/timer/TimingWheel.scala > e92aba3844dbf3372182e14536a5d98cf3366d73 > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 64-65 > > <https://reviews.apache.org/r/34734/diff/1/?file=973063#file973063line64> > > > > Could you explain a bit why this is needed? It seems that we can add > > the entry either when it's created for the first time or when it's removed > > from the current list and needs to be added to a new list during reinsert. > > In both cases, the list in the entry will be null and there is no need to > > remove the entry from the list. > > Yasuhiro Matsuda wrote: > I will remove this. On second thought, I will leave this because this doesn't harm, and this ensures the consistency of the data structure without depending on callers to do the right thing. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85596 ------- On May 29, 2015, 5:49 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 29, 2015, 5:49 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/Timer.scala > b8cde820a770a4e894804f1c268b24b529940650 > core/src/main/scala/kafka/utils/timer/TimerTask.scala > 3407138115d579339ffb6b00e32e38c984ac5d6e > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > core/src/main/scala/kafka/utils/timer/TimingWheel.scala > e92aba3844dbf3372182e14536a5d98cf3366d73 > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Commented] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14565133#comment-14565133 ] Yasuhiro Matsuda commented on KAFKA-2226: - Updated reviewboard https://reviews.apache.org/r/34734/diff/ against branch origin/trunk > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 64-65 > > <https://reviews.apache.org/r/34734/diff/1/?file=973063#file973063line64> > > > > Could you explain a bit why this is needed? It seems that we can add > > the entry either when it's created for the first time or when it's removed > > from the current list and needs to be added to a new list during reinsert. > > In both cases, the list in the entry will be null and there is no need to > > remove the entry from the list. I will remove this. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85596 ------- On May 29, 2015, 5:49 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 29, 2015, 5:49 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/Timer.scala > b8cde820a770a4e894804f1c268b24b529940650 > core/src/main/scala/kafka/utils/timer/TimerTask.scala > 3407138115d579339ffb6b00e32e38c984ac5d6e > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > core/src/main/scala/kafka/utils/timer/TimingWheel.scala > e92aba3844dbf3372182e14536a5d98cf3366d73 > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Attachment: KAFKA-2226_2015-05-29_10:49:34.patch > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch, > KAFKA-2226_2015-05-29_10:49:34.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/ --- (Updated May 29, 2015, 5:49 p.m.) Review request for kafka. Bugs: KAFKA-2226 https://issues.apache.org/jira/browse/KAFKA-2226 Repository: kafka Description --- fix a race condition in TimerTaskEntry.remove Diffs (updated) - core/src/main/scala/kafka/utils/timer/Timer.scala b8cde820a770a4e894804f1c268b24b529940650 core/src/main/scala/kafka/utils/timer/TimerTask.scala 3407138115d579339ffb6b00e32e38c984ac5d6e core/src/main/scala/kafka/utils/timer/TimerTaskList.scala e7a96570ddc2367583d6d5590628db7e7f6ba39b core/src/main/scala/kafka/utils/timer/TimingWheel.scala e92aba3844dbf3372182e14536a5d98cf3366d73 Diff: https://reviews.apache.org/r/34734/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 34734: Patch for KAFKA-2226
> On May 29, 2015, 1:56 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 106 > > <https://reviews.apache.org/r/34734/diff/2/?file=973959#file973959line106> > > > > Since this is under synchronized, it seems that remove should always > > return true? Oh. You are right. I am not sure what I was thinking. > On May 29, 2015, 1:56 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 153 > > <https://reviews.apache.org/r/34734/diff/2/?file=973959#file973959line153> > > > > Not sure if I follow this comment. I meant "To cancel a task, it should be removed by calling cancel() to prevent it from reinsert." > On May 29, 2015, 1:56 a.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 136-137 > > <https://reviews.apache.org/r/34734/diff/2/?file=973959#file973959line136> > > > > With this canceled flag, the logic is a bit more complicated since a > > few other places need to check this flag. Not sure how much this helps in > > reducing the probability of having a cancelled operation reinserted into > > the list. Do you think it's worth doing this? I believe that this will significantly eliminate canceled task being reinserted to a timing wheel or submitted to the task executor. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85665 --- On May 29, 2015, 12:19 a.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 29, 2015, 12:19 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/Timer.scala > b8cde820a770a4e894804f1c268b24b529940650 > core/src/main/scala/kafka/utils/timer/TimerTask.scala > 3407138115d579339ffb6b00e32e38c984ac5d6e > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > core/src/main/scala/kafka/utils/timer/TimingWheel.scala > e92aba3844dbf3372182e14536a5d98cf3366d73 > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Commented] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14563965#comment-14563965 ] Yasuhiro Matsuda commented on KAFKA-2226: - Updated reviewboard https://reviews.apache.org/r/34734/diff/ against branch origin/trunk > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/ --- (Updated May 29, 2015, 12:19 a.m.) Review request for kafka. Bugs: KAFKA-2226 https://issues.apache.org/jira/browse/KAFKA-2226 Repository: kafka Description --- fix a race condition in TimerTaskEntry.remove Diffs (updated) - core/src/main/scala/kafka/utils/timer/Timer.scala b8cde820a770a4e894804f1c268b24b529940650 core/src/main/scala/kafka/utils/timer/TimerTask.scala 3407138115d579339ffb6b00e32e38c984ac5d6e core/src/main/scala/kafka/utils/timer/TimerTaskList.scala e7a96570ddc2367583d6d5590628db7e7f6ba39b core/src/main/scala/kafka/utils/timer/TimingWheel.scala e92aba3844dbf3372182e14536a5d98cf3366d73 Diff: https://reviews.apache.org/r/34734/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Attachment: KAFKA-2226_2015-05-28_17:18:55.patch > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch, KAFKA-2226_2015-05-28_17:18:55.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 132-135 > > <https://reviews.apache.org/r/34734/diff/1/?file=973063#file973063line132> > > > > So, I guess the race condition is the following. The expiration thread > > moves a TimerTaskEntry from one TimerTaskList to another during reinsert. > > At the same time, another thread can complete an operation and try to > > remove the entry from the list. Is that right? > > > > With the patch, it seems when TimerTask.cancel tries to re move an > > entry from the list, the following can happen (1) line 133 tests that list > > in entry is not null, (2) a reinsert process happens and the entry is > > removed from list which sets list in the entry to null, (3) list.remove in > > 134 is called and the entry is not removed since list is now null, (4) line > > 133 is tested again and since list is now null, we quit the loop, (5) the > > reinsert process adds the entry to a new list. > > > > At this point, a completed entry still exists in the list. > > Yasuhiro Matsuda wrote: > You are right. It should be rare, but a completed entry can remain in the > list until expiration. The completion flag in DelayedOperation prevents > excess executions. So, it is not too bad. > > Jun Rao wrote: > Thanks for the clarification. Thinking about this a bit more, could we > hit a NullPointerException in step (3)? At that point, list could be null > when we call remove. > > Onur Karaman wrote: > Yeah that check-then-act should probably be done atomically. Maybe all > changes/check-then-acts to TimerTaskEntry just need to be guarded by the > TimerTaskEntry's intrinsic lock? > > Yasuhiro Matsuda wrote: > Oops. The value of TimerTaskEntry.list should be save to a local variable. > > Jun Rao wrote: > Ok, thanks. Perhaps we can also add some comments explaining why we need > the while loop and the rare possibility of not removing a completed operation > from the Timer. > > Guozhang Wang wrote: > Could it happen that concurrent threads are calling > TimerTaskList.add(entry) and TimerTaskList.remove(entry) on different lists > for the same entry? Since we do not synchronize on the entry object this > could cause race condition on next/prev/list reference manipulation. I will add a flag to TimerTaskEntry for cancelation to so that a canceled task won't be reinserted. This should reduce a chance of leaving completed task in a list. Also I will add sync on TimerTaskEntry to TimerTaskList.{add|remove}. This makes one operation sync on both TimerTaskList and TimerTaskEntry. We have to be careful in ordering them, otherwise it may cause a deadlock. I hope I did it right. Please review my next patch carefully. - Yasuhiro ------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85596 --- On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 27, 2015, 9 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 132-135 > > <https://reviews.apache.org/r/34734/diff/1/?file=973063#file973063line132> > > > > So, I guess the race condition is the following. The expiration thread > > moves a TimerTaskEntry from one TimerTaskList to another during reinsert. > > At the same time, another thread can complete an operation and try to > > remove the entry from the list. Is that right? > > > > With the patch, it seems when TimerTask.cancel tries to re move an > > entry from the list, the following can happen (1) line 133 tests that list > > in entry is not null, (2) a reinsert process happens and the entry is > > removed from list which sets list in the entry to null, (3) list.remove in > > 134 is called and the entry is not removed since list is now null, (4) line > > 133 is tested again and since list is now null, we quit the loop, (5) the > > reinsert process adds the entry to a new list. > > > > At this point, a completed entry still exists in the list. > > Yasuhiro Matsuda wrote: > You are right. It should be rare, but a completed entry can remain in the > list until expiration. The completion flag in DelayedOperation prevents > excess executions. So, it is not too bad. > > Jun Rao wrote: > Thanks for the clarification. Thinking about this a bit more, could we > hit a NullPointerException in step (3)? At that point, list could be null > when we call remove. > > Onur Karaman wrote: > Yeah that check-then-act should probably be done atomically. Maybe all > changes/check-then-acts to TimerTaskEntry just need to be guarded by the > TimerTaskEntry's intrinsic lock? Oops. The value of TimerTaskEntry.list should be save to a local variable. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85596 --- On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 27, 2015, 9 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > Thanks for the patch. A couple of questions below. > > > > Also, in TimerTask.setTimerTaskEntry(), the comment suggests that the > > TaskEntry can change for a TimerTask. However, it seems that we set the > > entry for the task only during entry construction time. So, can the > > TaskEntry in the TimerTask ever change? It can happen if a TimerTask already in a timer is submitted again or submitted to another timer. We never do such a thing. But the code handle such uses just in case. > On May 28, 2015, 7:10 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, lines 132-135 > > <https://reviews.apache.org/r/34734/diff/1/?file=973063#file973063line132> > > > > So, I guess the race condition is the following. The expiration thread > > moves a TimerTaskEntry from one TimerTaskList to another during reinsert. > > At the same time, another thread can complete an operation and try to > > remove the entry from the list. Is that right? > > > > With the patch, it seems when TimerTask.cancel tries to re move an > > entry from the list, the following can happen (1) line 133 tests that list > > in entry is not null, (2) a reinsert process happens and the entry is > > removed from list which sets list in the entry to null, (3) list.remove in > > 134 is called and the entry is not removed since list is now null, (4) line > > 133 is tested again and since list is now null, we quit the loop, (5) the > > reinsert process adds the entry to a new list. > > > > At this point, a completed entry still exists in the list. You are right. It should be rare, but a completed entry can remain in the list until expiration. The completion flag in DelayedOperation prevents excess executions. So, it is not too bad. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85596 --- On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 27, 2015, 9 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 34734: Patch for KAFKA-2226
> On May 28, 2015, 8:42 a.m., Onur Karaman wrote: > > It seems like you're concerned about adding/removing a TimerTaskEntry that > > already exists in another TimerTaskList. Can you explain how that can > > happen? My understanding of the timing wheel stuff is only so-so. For adding, a TimerTaskEntry should not exist in any list. If it does, removing it from the existing list keeps the structure consistent. That is why I added the remove call in the add method. For removing, there is a race condition. When a bucket expires, an entry in the bucket is either expired or moved to a finer grain wheel. TimerTaskEntry.remove is called then. Then the race condition happens if TimerTask.cancel is called at the same time. The remove operation is synchronized on a TimerTaskList instance. Therefore, in the syncrinized block, we have to doublecheck that the entry still belongs to the list. If the mehod doesn't remove the entry from the list due to the race condition, it will retry. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/#review85530 --- On May 27, 2015, 9 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34734/ > --- > > (Updated May 27, 2015, 9 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2226 > https://issues.apache.org/jira/browse/KAFKA-2226 > > > Repository: kafka > > > Description > --- > > fix a race condition in TimerTaskEntry.remove > > > Diffs > - > > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala > e7a96570ddc2367583d6d5590628db7e7f6ba39b > > Diff: https://reviews.apache.org/r/34734/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Status: Patch Available (was: Open) > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14561724#comment-14561724 ] Yasuhiro Matsuda commented on KAFKA-2226: - Created reviewboard https://reviews.apache.org/r/34734/diff/ against branch origin/trunk > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34734: Patch for KAFKA-2226
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34734/ --- Review request for kafka. Bugs: KAFKA-2226 https://issues.apache.org/jira/browse/KAFKA-2226 Repository: kafka Description --- fix a race condition in TimerTaskEntry.remove Diffs - core/src/main/scala/kafka/utils/timer/TimerTaskList.scala e7a96570ddc2367583d6d5590628db7e7f6ba39b Diff: https://reviews.apache.org/r/34734/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Updated] (KAFKA-2226) NullPointerException in TestPurgatoryPerformance
[ https://issues.apache.org/jira/browse/KAFKA-2226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2226: Attachment: KAFKA-2226.patch > NullPointerException in TestPurgatoryPerformance > > > Key: KAFKA-2226 > URL: https://issues.apache.org/jira/browse/KAFKA-2226 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2226.patch > > > A NullPointerException sometimes shows up in TimerTaskList.remove while > running TestPurgatoryPerformance. I’m on the HEAD of trunk. > {code} > > ./bin/kafka-run-class.sh kafka.TestPurgatoryPerformance --key-space-size > > 10 --keys 3 --num 10 --pct50 50 --pct75 75 --rate 1000 --size 50 > > --timeout 20 > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/Users/okaraman/code/kafka/core/build/dependant-libs-2.10.5/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2015-05-27 10:02:14,782] ERROR [completion thread], Error due to > (kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1) > java.lang.NullPointerException > at kafka.utils.timer.TimerTaskList.remove(TimerTaskList.scala:80) > at kafka.utils.timer.TimerTaskEntry.remove(TimerTaskList.scala:128) > at kafka.utils.timer.TimerTask$class.cancel(TimerTask.scala:27) > at kafka.server.DelayedOperation.cancel(DelayedOperation.scala:50) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:71) > at > kafka.TestPurgatoryPerformance$CompletionQueue$$anon$1.doWork(TestPurgatoryPerformance.scala:263) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14561470#comment-14561470 ] Yasuhiro Matsuda commented on KAFKA-1989: - Thanks. I am looking into it. Would you file a separate ticket for this? > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 >Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Fix For: 0.8.3 > > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch, > KAFKA-1989_2015-04-08_13:27:59.patch, KAFKA-1989_2015-04-08_14:29:51.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33071: Dynamically load JRE-specific class in TestPurgatoryPerformance
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33071/#review80342 --- Cool! It looks good to me. - Yasuhiro Matsuda On April 10, 2015, 8:45 a.m., Rajini Sivaram wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33071/ > --- > > (Updated April 10, 2015, 8:45 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2113 > https://issues.apache.org/jira/browse/KAFKA-2113 > > > Repository: kafka > > > Description > --- > > Patch for KAFKA-2113: Dynamically load JRE-specific class in > TestPurgatoryPerformance > > > Diffs > - > > core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala > 962253a7260c90233d4a4c4fe8c75af211453f2a > > Diff: https://reviews.apache.org/r/33071/diff/ > > > Testing > --- > > > Thanks, > > Rajini Sivaram > >
[jira] [Commented] (KAFKA-2112) make overflowWheel volatile
[ https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14487830#comment-14487830 ] Yasuhiro Matsuda commented on KAFKA-2112: - Created reviewboard https://reviews.apache.org/r/33028/diff/ against branch origin/trunk > make overflowWheel volatile > --- > > Key: KAFKA-2112 > URL: https://issues.apache.org/jira/browse/KAFKA-2112 > Project: Kafka > Issue Type: Bug > Components: purgatory > Reporter: Yasuhiro Matsuda >Assignee: Joel Koshy > Attachments: KAFKA-2112.patch > > > overflowWheel in TimingWheel needs to be volatile due to the issue of > Double-Checked Locking pattern with JVM. > http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2112) make overflowWheel volatile
[ https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2112: Attachment: KAFKA-2112.patch > make overflowWheel volatile > --- > > Key: KAFKA-2112 > URL: https://issues.apache.org/jira/browse/KAFKA-2112 > Project: Kafka > Issue Type: Bug > Components: purgatory > Reporter: Yasuhiro Matsuda >Assignee: Joel Koshy > Attachments: KAFKA-2112.patch > > > overflowWheel in TimingWheel needs to be volatile due to the issue of > Double-Checked Locking pattern with JVM. > http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2112) make overflowWheel volatile
[ https://issues.apache.org/jira/browse/KAFKA-2112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2112: Assignee: Yasuhiro Matsuda (was: Joel Koshy) Status: Patch Available (was: Open) > make overflowWheel volatile > --- > > Key: KAFKA-2112 > URL: https://issues.apache.org/jira/browse/KAFKA-2112 > Project: Kafka > Issue Type: Bug > Components: purgatory > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda > Attachments: KAFKA-2112.patch > > > overflowWheel in TimingWheel needs to be volatile due to the issue of > Double-Checked Locking pattern with JVM. > http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 33028: Patch for KAFKA-2112
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33028/ --- Review request for kafka. Bugs: KAFKA-2112 https://issues.apache.org/jira/browse/KAFKA-2112 Repository: kafka Description --- make overflowWheel volatile to deal with the issue of double checked locking in JVM Diffs - core/src/main/scala/kafka/utils/timer/TimingWheel.scala 9a36c20de462b5e8716d3a0653be33c5b842eed8 Diff: https://reviews.apache.org/r/33028/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Created] (KAFKA-2112) make overflowWheel volatile
Yasuhiro Matsuda created KAFKA-2112: --- Summary: make overflowWheel volatile Key: KAFKA-2112 URL: https://issues.apache.org/jira/browse/KAFKA-2112 Project: Kafka Issue Type: Bug Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Joel Koshy overflowWheel in TimingWheel needs to be volatile due to the issue of Double-Checked Locking pattern with JVM. http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486099#comment-14486099 ] Yasuhiro Matsuda commented on KAFKA-1989: - Updated reviewboard https://reviews.apache.org/r/31568/diff/ against branch origin/trunk > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 >Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch, > KAFKA-1989_2015-04-08_13:27:59.patch, KAFKA-1989_2015-04-08_14:29:51.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-1989: Attachment: KAFKA-1989_2015-04-08_14:29:51.patch > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch, > KAFKA-1989_2015-04-08_13:27:59.patch, KAFKA-1989_2015-04-08_14:29:51.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 8, 2015, 9:30 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs (updated) - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Updated] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-1989: Attachment: KAFKA-1989_2015-04-08_13:27:59.patch > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch, > KAFKA-1989_2015-04-08_13:27:59.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485950#comment-14485950 ] Yasuhiro Matsuda commented on KAFKA-1989: - Updated reviewboard https://reviews.apache.org/r/31568/diff/ against branch origin/trunk > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 >Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch, > KAFKA-1989_2015-04-08_13:27:59.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 8, 2015, 8:28 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs (updated) - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31568: Patch for KAFKA-1989
> On April 8, 2015, 7:15 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 123-126 > > <https://reviews.apache.org/r/31568/diff/4/?file=920086#file920086line123> > > > > We do not need to address it in this patch but may think about it > > moving forward: for now delayed produce / fetch's onComplete() only writes > > some data to the socket, while in the future other delayed operations may > > do more work in onComplete() and hence taking time; we may want to extend > > it to multiple executor threads in the pool. I agree! - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review79408 ------- On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated April 7, 2015, 9:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > b06f00bc10acb90083714edb5815306d1f646ddc > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 31568: Patch for KAFKA-1989
> On April 8, 2015, 6:44 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 59 > > <https://reviews.apache.org/r/31568/diff/3-4/?file=912743#file912743line59> > > > > Each bucket here needs to cover a window of size 3, right? Sorry, I messed up the example. > On April 8, 2015, 6:44 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, lines 67-68 > > <https://reviews.apache.org/r/31568/diff/3-4/?file=912743#file912743line67> > > > > Aren't these two levels starting at c+1? Level 2 and 3 do not have buckets starting at c+1. > On April 8, 2015, 6:44 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 96 > > <https://reviews.apache.org/r/31568/diff/4/?file=920091#file920091line96> > > > > Does this need to be volatile since all accesses to it are synchronized? Actually, they are not synchronized. Accesses are protected by read/write locks in Timer. I think it is necessary to have volatile. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review79269 --- On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated April 7, 2015, 9:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > b06f00bc10acb90083714edb5815306d1f646ddc > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 31568: Patch for KAFKA-1989
> On April 6, 2015, 10:35 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 104-105 > > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line104> > > > > We probably should call forceComplete() first and only if it returns > > true, run onExpiration(). > > Yasuhiro Matsuda wrote: > This came from the original ExpiredOperationReaper.expireNext(). Also the > comment on onExpiration says, "Call-back to execute when a delayed operation > expires, but before completion." So, I cannot call forceComplete before > onExpiration. I think we can do a little refactoring to clean this up later. > > Guozhang Wang wrote: > Yeah I think this is actually an old bug rather than introduced by this > patch: if the task is already completed before it is timed out we should not > mark it as "expired". We should change the comment of onExpiration to > "Call-back to execute when a delayed operation gets expired and hence forced > to complete." We are changing the behaviour as Guozhang suggested. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78668 --- On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated April 7, 2015, 9:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > b06f00bc10acb90083714edb5815306d1f646ddc > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 31568: Patch for KAFKA-1989
> On April 7, 2015, 10:17 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 178-188 > > <https://reviews.apache.org/r/31568/diff/3/?file=912738#file912738line178> > > > > What is the usage of watchCreated here? It seems only gets intialized > > as false and if the function is not returned in line 182, it will always > > execute line 188. So would that be equal to just moving line 188 to after > > line 190? It is used to increment estimatedTotalOperations only once per operation that actually create a watcher. After watchCreated is set to true, the subsequent iterations won't increment estimatedTotalOperations. > On April 7, 2015, 10:17 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/utils/timer/TimingWheel.scala, line 101 > > <https://reviews.apache.org/r/31568/diff/3/?file=912743#file912743line101> > > > > We already check (overflowWheel == null) inside the function, hence it > > can be removed here. This is an optimization to avoid synchronizations over and over. Once overflowWheel is created it is reduced to a single null compare (no method call, no synchronization), which is the cheapest. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review79257 ------- On April 7, 2015, 9:59 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated April 7, 2015, 9:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > b06f00bc10acb90083714edb5815306d1f646ddc > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Commented] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14484170#comment-14484170 ] Yasuhiro Matsuda commented on KAFKA-1989: - Updated reviewboard https://reviews.apache.org/r/31568/diff/ against branch origin/trunk > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 >Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-1989: Attachment: KAFKA-1989_2015-04-07_14:59:33.patch > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch, KAFKA-1989_2015-04-07_14:59:33.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 7, 2015, 9:59 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs (updated) - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala b06f00bc10acb90083714edb5815306d1f646ddc core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31568: Patch for KAFKA-1989
1568/diff/3/?file=912738#file912738line369> > > > > Perhaps we should trigger a purge if the number of total purgible > > operations (instead of the number of unque purgible operations) is more > > than the purgeInternal. This can be estimated as > > watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - > > delayed). Would you explain why that is better? It will trigger a lot more purge calls. And the frequency of calls depends on how many keys each request has. When the average number of keys per operation is large, it is possible to have a case that the total number of watchers exceeds the threshold, but there are only a few distinct operations to remove. It is hard to tune. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78668 --- On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated April 1, 2015, 8:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Updated] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-1989: Attachment: KAFKA-1989_2015-04-01_13:49:58.patch > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1989) New purgatory design
[ https://issues.apache.org/jira/browse/KAFKA-1989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14391422#comment-14391422 ] Yasuhiro Matsuda commented on KAFKA-1989: - Updated reviewboard https://reviews.apache.org/r/31568/diff/ against branch origin/trunk > New purgatory design > > > Key: KAFKA-1989 > URL: https://issues.apache.org/jira/browse/KAFKA-1989 > Project: Kafka > Issue Type: Improvement > Components: purgatory >Affects Versions: 0.8.2.0 >Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Critical > Attachments: KAFKA-1989.patch, KAFKA-1989_2015-03-20_08:44:57.patch, > KAFKA-1989_2015-04-01_13:49:58.patch > > > This is a new design of purgatory based on Hierarchical Timing Wheels. > https://cwiki.apache.org/confluence/display/KAFKA/Purgatory+Redesign+Proposal -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 1, 2015, 8:50 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs (updated) - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31568: Patch for KAFKA-1989
ala, lines > > 55-59 > > <https://reviews.apache.org/r/31568/diff/2/?file=901432#file901432line55> > > > > Not quite sure what this is testing. It's not clear to me why the > > sharedCounter won't increase after add. Perhaps, we can add some comments. It is testing that reinserting the existing tasks doesn't change the task count. I will add comments. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78444 --- On March 20, 2015, 3:45 p.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > --- > > (Updated March 20, 2015, 3:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > --- > > new purgatory implementation > > > Diffs > - > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
Re: Review Request 31893: Patch for KAFKA-2013
> On March 31, 2015, 11:47 p.m., Jun Rao wrote: > > core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 261 > > <https://reviews.apache.org/r/31893/diff/5/?file=900766#file900766line261> > > > > Could we just put FakeOperation to the delayQueue directly instead of > > wrapping it under Scheduled? We have to use completion time instead of expiration time for CompletionQueue. That is why the operaion is wrapped. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/#review76975 --- On April 1, 2015, 12:31 a.m., Yasuhiro Matsuda wrote: > > --- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31893/ > --- > > (Updated April 1, 2015, 12:31 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2013 > https://issues.apache.org/jira/browse/KAFKA-2013 > > > Repository: kafka > > > Description > --- > > purgatory micro benchmark > > > Diffs > - > > core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31893/diff/ > > > Testing > --- > > > Thanks, > > Yasuhiro Matsuda > >
[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2013: Attachment: KAFKA-2013_2015-03-31_17:30:56.patch > benchmark test for the purgatory > > > Key: KAFKA-2013 > URL: https://issues.apache.org/jira/browse/KAFKA-2013 > Project: Kafka > Issue Type: Test > Components: purgatory > Reporter: Yasuhiro Matsuda > Assignee: Yasuhiro Matsuda >Priority: Trivial > Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, > KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch, > KAFKA-2013_2015-03-19_16:30:52.patch, KAFKA-2013_2015-03-31_17:30:56.patch > > > We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14389764#comment-14389764 ] Yasuhiro Matsuda commented on KAFKA-2013: - Updated reviewboard https://reviews.apache.org/r/31893/diff/ against branch origin/trunk > benchmark test for the purgatory > > > Key: KAFKA-2013 > URL: https://issues.apache.org/jira/browse/KAFKA-2013 > Project: Kafka > Issue Type: Test > Components: purgatory > Reporter: Yasuhiro Matsuda >Assignee: Yasuhiro Matsuda >Priority: Trivial > Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, > KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch, > KAFKA-2013_2015-03-19_16:30:52.patch, KAFKA-2013_2015-03-31_17:30:56.patch > > > We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)