[jira] [Commented] (KAFKA-4829) Improve logging of StreamTask commits

2017-06-14 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049834#comment-16049834
 ] 

Jeyhun Karimov commented on KAFKA-4829:
---

[~guozhang] I would suggest similar model to SQL Server's logging configs 
(https://docs.microsoft.com/en-us/sql/relational-databases/backup-restore/recovery-models-sql-server)
I think current logging can be too much on high workloads and we need a config 
parameter on this. I believe this will require a KIP. WDYT?

> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: user-experience
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-3907) Better support for user-specific committing in the Streams DSL

2017-06-14 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3907:
-

Assignee: Jeyhun Karimov

> Better support for user-specific committing in the Streams DSL
> --
>
> Key: KAFKA-3907
> URL: https://issues.apache.org/jira/browse/KAFKA-3907
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Currently for user-specifically committing the current processing state, 
> users can make use of the {{ProcessorContext}} object, which is exposed in 
> the {{Processor}} API. Other than that, the application will also 
> automatically committing the processing state based on the configured 
> interval.
> Hence in the Streams DSL, if a user wants to explicitly call {{commit}}, she 
> needs to use a {{process(ProcessorSupplier)}} API to get a customized 
> processor instance in order to access the {{ProcessorContext}}. We should 
> think of a better way to support user-specific committing interfaces inside 
> the high-level Streams DSL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2017-06-12 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046497#comment-16046497
 ] 

Jeyhun Karimov edited comment on KAFKA-3826 at 6/12/17 12:32 PM:
-

[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing user defined sampling function for metrics like latency and 
throughput is feasible. However, we have many log4j loggings in the library 
that for each receiving record we make a log, which can clearly be a bottleneck 
in some use-cases.
 So, we can stick to user defined sampling for latency and throughput and for 
all logs we should provide a config that specifies the frequency of logging. 
For example, if frequency is 1.0, then the library functions will log all, 0.0 
will not log. 

WDYT? cc: [~mjsax]


was (Author: jeyhunkarimov):
[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing sampling function for metrics like latency and throughput is 
feasible. However, we have many log4j loggings in the library that for each 
receiving record we make a log, which can clearly be a bottleneck in some 
use-cases.
 So, we can stick to sampling for latency and throughput and for all logs we 
should provide a config that specifies the frequency of logging. For example, 
if frequency is 1.0, then the library functions will log all, 0.0 will not log. 

WDYT? cc:\[~mjsax]

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2017-06-12 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046497#comment-16046497
 ] 

Jeyhun Karimov commented on KAFKA-3826:
---

[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing sampling function for metrics like latency and throughput is 
feasible. However, we have many log4j loggings in the library that for each 
receiving record we make a log, which can clearly be a bottleneck in some 
use-cases.
 So, we can stick to sampling for latency and throughput and for all logs we 
should provide a config that specifies the frequency of logging. For example, 
if frequency is 1.0, then the library functions will log all, 0.0 will not log. 

WDYT? cc:\[~mjsax]

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4653) Improve test coverage of RocksDBStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4653:
-

Assignee: Jeyhun Karimov

> Improve test coverage of RocksDBStore
> -
>
> Key: KAFKA-4653
> URL: https://issues.apache.org/jira/browse/KAFKA-4653
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> {{putAll}} - not covered
> {{putInternal} - exceptions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4658) Improve test coverage InMemoryKeyValueLoggedStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4658:
-

Assignee: Jeyhun Karimov

> Improve test coverage InMemoryKeyValueLoggedStore
> -
>
> Key: KAFKA-4658
> URL: https://issues.apache.org/jira/browse/KAFKA-4658
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
> Fix For: 0.11.1.0
>
>
> {{putAll} not covered



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4656) Improve test coverage of CompositeReadOnlyKeyValueStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4656:
-

Assignee: Jeyhun Karimov

> Improve test coverage of CompositeReadOnlyKeyValueStore
> ---
>
> Key: KAFKA-4656
> URL: https://issues.apache.org/jira/browse/KAFKA-4656
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> exceptions not covered



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4659) Improve test coverage of CachingKeyValueStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4659:
-

Assignee: Jeyhun Karimov

> Improve test coverage of CachingKeyValueStore
> -
>
> Key: KAFKA-4659
> URL: https://issues.apache.org/jira/browse/KAFKA-4659
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> {{putIfAbsent}} mostly not covered



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4655) Improve test coverage of CompositeReadOnlySessionStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4655:
-

Assignee: Jeyhun Karimov

> Improve test coverage of CompositeReadOnlySessionStore
> --
>
> Key: KAFKA-4655
> URL: https://issues.apache.org/jira/browse/KAFKA-4655
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
> Fix For: 0.11.1.0
>
>
> exceptions in fetch and internal iterator



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4661) Improve test coverage UsePreviousTimeOnInvalidTimestamp

2017-06-09 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4661:
-

Assignee: Jeyhun Karimov

> Improve test coverage UsePreviousTimeOnInvalidTimestamp
> ---
>
> Key: KAFKA-4661
> URL: https://issues.apache.org/jira/browse/KAFKA-4661
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
>Priority: Minor
>
> Exception branch not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4325) Improve processing of late records for window operations

2017-06-02 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16035688#comment-16035688
 ] 

Jeyhun Karimov commented on KAFKA-4325:
---

[~mjsax], would this jira requre KIP? 

> Improve processing of late records for window operations
> 
>
> Key: KAFKA-4325
> URL: https://issues.apache.org/jira/browse/KAFKA-4325
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Windows are kept until their retention time passed. If a late arriving record 
> is processed that is older than any window kept, a new window is created 
> containing this single late arriving record, the aggregation is computed and 
> the window is immediately discarded afterward (as it is older than retention 
> time).
> This behavior might case problems for downstream application as the original 
> window aggregate might we overwritten with the late single-record- aggregate 
> value. Thus, we should rather not process the late arriving record for this 
> case.
> However, data loss might not be acceptable for all use cases. In order to 
> enable the use to not lose any data, window operators should allow to 
> register a handler function that is called instead of just dropping the late 
> arriving record.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027964#comment-16027964
 ] 

Jeyhun Karimov edited comment on KAFKA-4304 at 5/28/17 10:34 PM:
-

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp. Anyway, I will initiate KIP for 
this.


was (Author: jeyhunkarimov):
>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp.

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4304:
-

Assignee: Jeyhun Karimov

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027964#comment-16027964
 ] 

Jeyhun Karimov edited comment on KAFKA-4304 at 5/28/17 10:25 PM:
-

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp.


was (Author: jeyhunkarimov):
>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this. 

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}}

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027964#comment-16027964
 ] 

Jeyhun Karimov commented on KAFKA-4304:
---

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this. 

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}}

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027958#comment-16027958
 ] 

Jeyhun Karimov commented on KAFKA-4304:
---

[~mjsax] I completely forgot this issue. Sorry for super late response. I 
misunderstood the issue (top-k staff). 
My question is, should we add a new public API to access tuple's update 
timestamp?If yes, I think this would be inefficient. 
Or (while querying with key) should we return the value  in a "package" 
containing its (key's) update timestamp? If yes, this would cause issues with 
backwards compatibility. 
Please correct me if I am wrong

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie++
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-05-19 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017575#comment-16017575
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax] Thanks for reminding. I am on it

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989838#comment-15989838
 ] 

Jeyhun Karimov edited comment on KAFKA-4785 at 4/29/17 6:08 PM:


[~mjsax], (as it is discussed in KAFKA-4218 ) I think we can group KAFKA-4219 
and this issue together.


was (Author: jeyhunkarimov):
[~mjsax], as it is discussed in KAFKA-4218 , we can group KAFKA-4219 and this 
issue together.

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989988#comment-15989988
 ] 

Jeyhun Karimov commented on KAFKA-3745:
---

[~sree2k] is there an update on issue?

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Sreepathi Prasanna
>Priority: Minor
>  Labels: api, needs-kip, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989838#comment-15989838
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax], as it is discussed in KAFKA-4218 , we can group KAFKA-4219 and this 
issue together.

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989837#comment-15989837
 ] 

Jeyhun Karimov commented on KAFKA-4218:
---

[~mjsax], thanks for your comments. I absolutely agree on this. I think the 
most relevant issues to group (with this issue) are (as shown in relevant links 
as well) :  KAFKA-4726 and KAFKA-3745 (although there is assignee, I haven't 
seen PR on this issue and it is stale for some time). Do you agree on this or 
are there other relevant issues that should be added to this group?


> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, needs-kip, newbie
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4218) Enable access to key in ValueTransformer

2017-04-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4218:
-

Assignee: Jeyhun Karimov

> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, needs-kip, newbie
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15989557#comment-15989557
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax] Do you think KAFKA-4144 is blocking for this issue? 

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4785:
-

Assignee: Jeyhun Karimov

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983515#comment-15983515
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---

[~mjsax] I am sorry, I didn't read your email carefully. My bad.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated KAFKA-4144:
--
Status: Reopened  (was: Closed)

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov closed KAFKA-4144.
-

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov resolved KAFKA-4144.
---
Resolution: Fixed
  Reviewer: Matthias J. Sax

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-25 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837350#comment-15837350
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---

Thanks for clarification. I got the point. 

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-24 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15836328#comment-15836328
 ] 

Jeyhun Karimov edited comment on KAFKA-4144 at 1/24/17 8:58 PM:


[~mjsax] [~elevy] Thanks for your explanation. So basically, if I want to 
launch a new stream application and pass {{TimestampExtractor}} to it, I have 
to use the {{TimestampExtractor}} which other stream applications are using.
Because, the {{configurations}} are global to all stream applications, 
basically I cannot define a new {{TimestampExtractor}} while other applications 
are running. Therefore, we want to pass {{TimestampExtractor}} not via 
{{configuration}} but with {{TopologyBuilder}} class. 
Please correct me if I am wrong.


was (Author: jeyhunkarimov):
[~mjsax] [~elevy] Thanks for your explanation. So basically, if I want to 
launch a new stream application and pass {{TimestampExtractor}} to it, I have 
to use the {{TimestampExtractor}} which other stream applications are using.
Because, the {{configuration}}s are global to all stream applications, 
basically I cannot define a new {{TimestampExtractor}} while other applications 
are running. Therefore, we want to pass {{TimestampExtractor}} not via 
{{configuration}} but with {{TopologyBuilder}} class. 
Please correct me if I am wrong.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-22 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833708#comment-15833708
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---

[~elevy] I am not sure I got your point. Please correct me if I am wrong in my 
assumptions. Lets look at different possibilities:

1. We create separate {{builder.stream/table(topicName)}} instances in 
application. So we clearly separate {{KTable}}/{{KStream}}s, one per topic. In 
this case, we can develop a solution described above. So, create different 
{{TimestampExtractor}} and as a result, assign one {{timeStampExtractor} per 
topic.

2. If we create {{builder.stream/table(topicName1, topicName2 ...)}} this can 
be challenging, as there is no clear separation of topics within stream 
application. 

I think you pointed out the second case. As a solution, we can provide 
{{Map}} to topology, indicating which 
{{TimestampExtractor}} is related to which topic. As a result, if the topics 
can change in runtime, the stream application can handle it.

Please correct me if I am wrong in my assumptions.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-01-22 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833607#comment-15833607
 ] 

Jeyhun Karimov commented on KAFKA-4304:
---

[~mjsax] Can we generalize this feature to {{top-k}} queries?

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie++
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4613) Treat null-key records the same way for joins and aggreations

2017-01-22 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4613:
-

Assignee: Jeyhun Karimov

> Treat null-key records the same way for joins and aggreations
> -
>
> Key: KAFKA-4613
> URL: https://issues.apache.org/jira/browse/KAFKA-4613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Currently, on aggregation records with null-key get dropped while for joins 
> we raise an exception.
> We might want to drop in both cases of raise an exception in both cases to be 
> consistent.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-22 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833131#comment-15833131
 ] 

Jeyhun Karimov edited comment on KAFKA-4144 at 1/22/17 3:06 PM:


[~elevy] After checking the code, I think we should add extra  
{{timestampextractor}} field to {{SourceNode}} class. In this case there will 
be one {{timeStampExtractor}} per topic partition. As a result, in 
{{StreamTask}} constructor, we can get access to particular {{SourceNode}}'s 
{{timeStampExtractor}}. Please let me know your comments on this approach. 


was (Author: jeyhunkarimov):
[~elevy] After checking the code, I think we should add extra  {{ 
timestampextractor }} field to {{ SourceNode }} class. In this case there will 
be one {{ timeStampExtractor }} per topic partition. As a result, in {{ 
StreamTask}} constructor, we can get access to particular {{ SourceNode }}'s {{ 
timeStampExtractor }}. Please let me know your comments on this approach. 

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-22 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833131#comment-15833131
 ] 

Jeyhun Karimov edited comment on KAFKA-4144 at 1/22/17 3:05 PM:


[~elevy] After checking the code, I think we should add extra  {{ 
timestampextractor }} field to {{ SourceNode }} class. In this case there will 
be one {{ timeStampExtractor }} per topic partition. As a result, in {{ 
StreamTask}} constructor, we can get access to particular {{ SourceNode }}'s {{ 
timeStampExtractor }}. Please let me know your comments on this approach. 


was (Author: jeyhunkarimov):
[~elevy] Shouldn't it be per partitioned stream/table? So, we can define 
'TimestampExtractor' per application, per stream/table and per partitioned 
stream/table ('KGoupedStream', 'KGoupedTable') and the latter would override 
the former.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3835) Streams is creating two ProducerRecords for each send via RecordCollector

2017-01-21 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3835:
-

Assignee: Jeyhun Karimov

> Streams is creating two ProducerRecords for each send via RecordCollector
> -
>
> Key: KAFKA-3835
> URL: https://issues.apache.org/jira/browse/KAFKA-3835
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Damian Guy
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: newbie
> Fix For: 0.10.3.0
>
>
> The RecordCollector.send(..) method below, currently receives a 
> ProducerRecord from its caller and then creates another one to forward on to 
> its producer.  The creation of 2 ProducerRecords should be removed.
> {code}
> public  void send(ProducerRecord record, Serializer 
> keySerializer, Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}
> We could replace the above method with
> {code}
> public  void send(String topic,
> K key,
> V value,
> Integer partition,
> Long timestamp,
> Serializer keySerializer,
> Serializer valueSerializer,
> StreamPartitioner partitioner)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-21 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833131#comment-15833131
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---

[~elevy] Shouldn't it be per partitioned stream/table? So, we can define 
'TimestampExtractor' per application, per stream/table and per partitioned 
stream/table ('KGoupedStream', 'KGoupedTable') and the latter would override 
the former.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-19 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4144:
-

Assignee: Jeyhun Karimov

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3856) Move inner classes accessible only functions in TopologyBuilder out of public APIs

2016-09-15 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3856 started by Jeyhun Karimov.
-
> Move inner classes accessible only functions in TopologyBuilder out of public 
> APIs
> --
>
> Key: KAFKA-3856
> URL: https://issues.apache.org/jira/browse/KAFKA-3856
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> In {{TopologyBuilder}} there are a couple of public functions that are 
> actually only used in the internal classes such as StreamThread and 
> StreamPartitionAssignor, and some accessible only in high-level DSL inner 
> classes, examples include {{addInternalTopic}}, {{sourceGroups}} and 
> {{copartitionGroups}}, etc. But they are still listed in Javadocs since this 
> class is part of public APIs.
> We should think about moving them out of the public functions. Unfortunately 
> there is no "friend" access mode as in C++, so we need to think of another 
> way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3825) Allow users to specify different types of state stores in Streams DSL

2016-09-15 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated KAFKA-3825:
--
Status: Patch Available  (was: Open)

> Allow users to specify different types of state stores in Streams DSL
> -
>
> Key: KAFKA-3825
> URL: https://issues.apache.org/jira/browse/KAFKA-3825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Today the high-level Streams DSL uses hard-coded types of state stores (i.e. 
> persistent RocksDB) for their stateful operations. But for advanced users 
> they should be able to specify different types of state stores (in-memory, 
> persistent, customized) also in the DSL, instead of resorting to the 
> lower-level APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3825) Allow users to specify different types of state stores in Streams DSL

2016-09-15 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15494574#comment-15494574
 ] 

Jeyhun Karimov commented on KAFKA-3825:
---

I am closing this issue as the PR has already been submitted. 

https://github.com/apache/kafka/pull/1588/commits/98a58d8241dbd95bbe10da220639ad0362259852



> Allow users to specify different types of state stores in Streams DSL
> -
>
> Key: KAFKA-3825
> URL: https://issues.apache.org/jira/browse/KAFKA-3825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Today the high-level Streams DSL uses hard-coded types of state stores (i.e. 
> persistent RocksDB) for their stateful operations. But for advanced users 
> they should be able to specify different types of state stores (in-memory, 
> persistent, customized) also in the DSL, instead of resorting to the 
> lower-level APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3184) Add Checkpoint for In-memory State Store

2016-09-15 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated KAFKA-3184:
--
Assignee: Guozhang Wang  (was: Jeyhun Karimov)

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3184) Add Checkpoint for In-memory State Store

2016-09-15 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated KAFKA-3184:
--
Assignee: (was: Guozhang Wang)

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3184) Add Checkpoint for In-memory State Store

2016-09-07 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3184:
-

Assignee: Jeyhun Karimov

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: user-experience
> Fix For: 0.10.1.0
>
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3856) Move inner classes accessible only functions in TopologyBuilder out of public APIs

2016-07-06 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3856:
-

Assignee: Jeyhun Karimov

> Move inner classes accessible only functions in TopologyBuilder out of public 
> APIs
> --
>
> Key: KAFKA-3856
> URL: https://issues.apache.org/jira/browse/KAFKA-3856
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> In {{TopologyBuilder}} there are a couple of public functions that are 
> actually only used in the internal classes such as StreamThread and 
> StreamPartitionAssignor, and some accessible only in high-level DSL inner 
> classes, examples include {{addInternalTopic}}, {{sourceGroups}} and 
> {{copartitionGroups}}, etc. But they are still listed in Javadocs since this 
> class is part of public APIs.
> We should think about moving them out of the public functions. Unfortunately 
> there is no "friend" access mode as in C++, so we need to think of another 
> way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3825) Allow users to specify different types of state stores in Streams DSL

2016-06-26 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350110#comment-15350110
 ] 

Jeyhun Karimov commented on KAFKA-3825:
---

[~guozhang] I am starting on this issue and I would to get your comments:

I want to overload stateful operators by adding new argument - State. So, if 
user wants to specify the state store in DSL, then the new state store is 
defined and passed as an argument to stateful operator. By default the stateful 
operators will behave with default state stores, but overloaded versions will 
allow them more flexibility. In this way, we can define different state stores 
to different stateful operators. 

> Allow users to specify different types of state stores in Streams DSL
> -
>
> Key: KAFKA-3825
> URL: https://issues.apache.org/jira/browse/KAFKA-3825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Today the high-level Streams DSL uses hard-coded types of state stores (i.e. 
> persistent RocksDB) for their stateful operations. But for advanced users 
> they should be able to specify different types of state stores (in-memory, 
> persistent, customized) also in the DSL, instead of resorting to the 
> lower-level APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3825) Allow users to specify different types of state stores in Streams DSL

2016-06-19 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3825:
-

Assignee: Jeyhun Karimov

> Allow users to specify different types of state stores in Streams DSL
> -
>
> Key: KAFKA-3825
> URL: https://issues.apache.org/jira/browse/KAFKA-3825
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: api
>
> Today the high-level Streams DSL uses hard-coded types of state stores (i.e. 
> persistent RocksDB) for their stateful operations. But for advanced users 
> they should be able to specify different types of state stores (in-memory, 
> persistent, customized) also in the DSL, instead of resorting to the 
> lower-level APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3836) KStreamReduce and KTableReduce should not pass nulls to Deserializers

2016-06-17 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3836:
-

Assignee: Jeyhun Karimov

> KStreamReduce and KTableReduce should not pass nulls to Deserializers
> -
>
> Key: KAFKA-3836
> URL: https://issues.apache.org/jira/browse/KAFKA-3836
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Avi Flax
>Assignee: Jeyhun Karimov
>Priority: Trivial
>
> As per [this 
> discussion|http://mail-archives.apache.org/mod_mbox/kafka-users/201606.mbox/%3ccahwhrru29jw4jgvhsijwbvlzb3bc6qz6pbh9tqcfbcorjk4...@mail.gmail.com%3e]
>  these classes currently pass null values along to Deserializers, so those 
> Deserializers need to handle null inputs and pass them through without 
> throwing. It would be better for these classes to simply not call the 
> Deserializers in this case; this would reduce the burden of implementers of 
> {{Deserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-17 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336310#comment-15336310
 ] 

Jeyhun Karimov commented on KAFKA-3839:
---

It seems that groupby operator is gone from KTable. Nevertheless, I checked its 
behaviour with null keys:

  KTable source = builder.table(longSerde, stringSerde, 
"topic1");
  KTable counts =  source.
  groupBy(new KeyValueMapper>() {

@Override
public KeyValue apply(Long key, String value) {
// TODO Auto-generated method stub
 return  KeyValue.pair(null, value);
}
},Serdes.String(), Serdes.String()).count("count");
  counts.to(stringSerde,longSerde,"topic2");

If I run this code, no exception/error is got and nothing is outputted to topic 
(as expected). I check the class KTableRepartitionMap.KTableMapProcessor and I 
think the required null checks are there in process() method.





> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-15 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3839 started by Jeyhun Karimov.
-
> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3839) Handling null keys in KTable.groupBy

2016-06-14 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330865#comment-15330865
 ] 

Jeyhun Karimov commented on KAFKA-3839:
---

I can take this task as a starter.

> Handling null keys in KTable.groupBy
> 
>
> Key: KAFKA-3839
> URL: https://issues.apache.org/jira/browse/KAFKA-3839
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, newbie
>
> This is first reported by [~jeff.klu...@gmail.com].
> Similarly as we handle null keys in KStream join and aggregations in 
> KAFKA-3561 (https://github.com/apache/kafka/pull/1472), for KTable.groupBy 
> operators we should also gracefully handle null keys by filter them out since 
> they will not be participating in the aggregation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)